Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add deck_uri to NodeExecutionEvent #443

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions events/node_event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (

//go:generate mockery -all -output=mocks -case=underscore

const (
outputsFile = "outputs.pb"
deckFile = "deck.html"
)

// NodeEventRecorder records Node events
type NodeEventRecorder interface {
// RecordNodeEvent records execution events indicating the node has undergone a phase change and additional metadata.
Expand Down Expand Up @@ -49,6 +54,16 @@ func (r *nodeEventRecorder) handleFailure(ctx context.Context, ev *event.NodeExe
func (r *nodeEventRecorder) RecordNodeEvent(ctx context.Context, ev *event.NodeExecutionEvent, eventConfig *config.EventConfig) error {
var origEvent = ev
var rawOutputPolicy = eventConfig.RawOutputPolicy

if len(ev.GetOutputUri()) > 0 {
// Both outputs.pb and deck.html should be in the same folder
deckURI := strings.Replace(ev.GetOutputUri(), outputsFile, deckFile, 1)
metadata, err := r.store.Head(ctx, storage.DataReference(deckURI))
if err == nil && metadata.Exists() {
ev.DeckUri = deckURI
}
}
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 {
outputs := &core.LiteralMap{}
err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs)
Expand Down
34 changes: 32 additions & 2 deletions events/node_event_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func getReferenceNodeEv() *event.NodeExecutionEvent {
OutputResult: &event.NodeExecutionEvent_OutputUri{
OutputUri: referenceURI,
},
DeckUri: deckURI,
}
}

Expand All @@ -32,6 +33,7 @@ func getRawOutputNodeEv() *event.NodeExecutionEvent {
OutputResult: &event.NodeExecutionEvent_OutputData{
OutputData: outputData,
},
DeckUri: deckURI,
}
}

Expand All @@ -42,8 +44,12 @@ func TestRecordNodeEvent_Success_ReferenceOutputs(t *testing.T) {
assert.True(t, proto.Equal(event, getReferenceNodeEv()))
return true
})).Return(nil)
metadata := existsMetadata{}
pbStore := &storageMocks.ComposedProtobufStore{}
pbStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckURI)).
Return(&metadata, nil)
mockStore := &storage.DataStore{
ComposedProtobufStore: &storageMocks.ComposedProtobufStore{},
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}

Expand All @@ -69,6 +75,9 @@ func TestRecordNodeEvent_Success_InlineOutputs(t *testing.T) {
arg := args.Get(2).(*core.LiteralMap)
*arg = *outputData
})
metadata := existsMetadata{}
pbStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckURI)).
Return(&metadata, nil)
mockStore := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
Expand All @@ -78,7 +87,9 @@ func TestRecordNodeEvent_Success_InlineOutputs(t *testing.T) {
eventRecorder: &eventRecorder,
store: mockStore,
}
err := recorder.RecordNodeEvent(ctx, getReferenceNodeEv(), inlineEventConfig)
nodeEvent := getReferenceNodeEv()
err := recorder.RecordNodeEvent(ctx, nodeEvent, inlineEventConfig)
assert.Equal(t, deckURI, nodeEvent.DeckUri)
assert.NoError(t, err)
}

Expand All @@ -93,6 +104,9 @@ func TestRecordNodeEvent_Failure_FetchInlineOutputs(t *testing.T) {
pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool {
return ref.String() == referenceURI
}), mock.Anything).Return(errors.New("foo"))
metadata := existsMetadata{}
pbStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckURI)).
Return(&metadata, nil)
mockStore := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
Expand Down Expand Up @@ -122,6 +136,9 @@ func TestRecordNodeEvent_Failure_FallbackReference_Retry(t *testing.T) {
arg := args.Get(2).(*core.LiteralMap)
*arg = *outputData
})
metadata := existsMetadata{}
pbStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckURI)).
Return(&metadata, nil)
mockStore := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
Expand All @@ -146,6 +163,9 @@ func TestRecordNodeEvent_Failure_FallbackReference_Unretriable(t *testing.T) {
arg := args.Get(2).(*core.LiteralMap)
*arg = *outputData
})
metadata := existsMetadata{}
pbStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckURI)).
Return(&metadata, nil)
mockStore := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
Expand All @@ -158,3 +178,13 @@ func TestRecordNodeEvent_Failure_FallbackReference_Unretriable(t *testing.T) {
err := recorder.RecordNodeEvent(ctx, getReferenceNodeEv(), inlineEventConfigFallback)
assert.EqualError(t, err, "foo")
}

type existsMetadata struct{}

func (e existsMetadata) Exists() bool {
return true
}

func (e existsMetadata) Size() int64 {
return int64(1)
}
1 change: 1 addition & 0 deletions events/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var referenceEventConfig = &config.EventConfig{
}

var referenceURI = "s3://foo/bar/outputs.pb"
var deckURI = "s3://foo/bar/deck.html"

var outputData = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v1.1.0
github.com/flyteorg/flyteidl v1.1.4
github.com/flyteorg/flyteplugins v1.0.0
github.com/flyteorg/flytestdlib v1.0.0
github.com/ghodss/yaml v1.0.0
Expand All @@ -15,6 +15,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/imdario/mergo v0.3.11
github.com/magiconair/properties v1.8.4
github.com/mitchellh/mapstructure v1.4.1
Expand Down Expand Up @@ -86,7 +87,6 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v1.0.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM=
github.com/flyteorg/flyteidl v1.1.0 h1:f8tdMXOuorS/d+4Ut2QarfDbdCOriK0S+EnlQzrwz9E=
github.com/flyteorg/flyteidl v1.1.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM=
github.com/flyteorg/flyteidl v1.1.4 h1:P6YgFYcmBxoLcTegv301i5oYKBCvjHGW3ujRT9s4dvI=
github.com/flyteorg/flyteidl v1.1.4/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g=
github.com/flyteorg/flyteplugins v1.0.0 h1:77hUJjiIxBmQ9rd3+cXjSGnzOVAFrSzCd59aIaYFB/8=
github.com/flyteorg/flyteplugins v1.0.0/go.mod h1:4Cpn+9RfanIieTTh2XsuL6zPYXtsR5UDe8YaEmXONT4=
github.com/flyteorg/flytestdlib v1.0.0 h1:gb99ignMsVcNTUmWzArtcIDdkRjyzQQVBkWNOQakiFg=
Expand Down