diff --git a/go/tasks/pluginmachinery/core/allocationstatus_enumer.go b/go/tasks/pluginmachinery/core/allocationstatus_enumer.go index acdffa19e..dbbb536ca 100644 --- a/go/tasks/pluginmachinery/core/allocationstatus_enumer.go +++ b/go/tasks/pluginmachinery/core/allocationstatus_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=AllocationStatus -trimprefix=AllocationStatus"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/pluginmachinery/core/phase_enumer.go b/go/tasks/pluginmachinery/core/phase_enumer.go index 8101f2f3d..bd26c4d56 100644 --- a/go/tasks/pluginmachinery/core/phase_enumer.go +++ b/go/tasks/pluginmachinery/core/phase_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=Phase"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/pluginmachinery/core/transitiontype_enumer.go b/go/tasks/pluginmachinery/core/transitiontype_enumer.go index 9b4b615ac..41608fb36 100644 --- a/go/tasks/pluginmachinery/core/transitiontype_enumer.go +++ b/go/tasks/pluginmachinery/core/transitiontype_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer --type=TransitionType"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go b/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go index c01befae4..bf25f1bf0 100644 --- a/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go +++ b/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=ResourceCustomizationMode -trimprefix=ResourceCustomizationMode"; DO NOT EDIT. -// package flytek8s import ( diff --git a/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go b/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go index b643fd135..9eff931df 100644 --- a/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go +++ b/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=Phase -trimprefix=Phase"; DO NOT EDIT. -// package webapi import ( diff --git a/go/tasks/pluginmachinery/io/iface.go b/go/tasks/pluginmachinery/io/iface.go index 15d5adefc..e2a3f5875 100644 --- a/go/tasks/pluginmachinery/io/iface.go +++ b/go/tasks/pluginmachinery/io/iface.go @@ -43,6 +43,8 @@ type OutputReader interface { Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error) // DeckExists checks if the deck file has been generated. DeckExists(ctx context.Context) (bool, error) + // GetOutputMetadata get the metadata from the output of tasks, such as deck URI. + GetOutputMetadata(ctx context.Context) map[string]string } // CheckpointPaths provides the paths / keys to input Checkpoints directory and an output checkpoints directory. diff --git a/go/tasks/pluginmachinery/io/mocks/output_reader.go b/go/tasks/pluginmachinery/io/mocks/output_reader.go index ed7e671e2..c1bdfc646 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_reader.go +++ b/go/tasks/pluginmachinery/io/mocks/output_reader.go @@ -94,6 +94,40 @@ func (_m *OutputReader) Exists(ctx context.Context) (bool, error) { return r0, r1 } +type OutputReader_GetOutputMetadata struct { + *mock.Call +} + +func (_m OutputReader_GetOutputMetadata) Return(_a0 map[string]string) *OutputReader_GetOutputMetadata { + return &OutputReader_GetOutputMetadata{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputReader) OnGetOutputMetadata(ctx context.Context) *OutputReader_GetOutputMetadata { + c_call := _m.On("GetOutputMetadata", ctx) + return &OutputReader_GetOutputMetadata{Call: c_call} +} + +func (_m *OutputReader) OnGetOutputMetadataMatch(matchers ...interface{}) *OutputReader_GetOutputMetadata { + c_call := _m.On("GetOutputMetadata", matchers...) + return &OutputReader_GetOutputMetadata{Call: c_call} +} + +// GetOutputMetadata provides a mock function with given fields: ctx +func (_m *OutputReader) GetOutputMetadata(ctx context.Context) map[string]string { + ret := _m.Called(ctx) + + var r0 map[string]string + if rf, ok := ret.Get(0).(func(context.Context) map[string]string); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]string) + } + } + + return r0 +} + type OutputReader_IsError struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go index 7dd10698d..d22a5b621 100644 --- a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go @@ -47,6 +47,13 @@ func (r InMemoryOutputReader) DeckExists(_ context.Context) (bool, error) { return r.DeckPath != nil, nil } +func (r InMemoryOutputReader) GetOutputMetadata(_ context.Context) map[string]string { + if r.DeckPath == nil { + return map[string]string{} + } + return map[string]string{deckURIKey: r.DeckPath.String()} +} + func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader { return InMemoryOutputReader{ literals: literals, diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go index 3cae110fd..a87113477 100644 --- a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go @@ -25,6 +25,7 @@ func TestInMemoryOutputReader(t *testing.T) { or := NewInMemoryOutputReader(&flyteIdlCore.LiteralMap{Literals: lt}, &deckPath, nil) assert.Equal(t, &deckPath, or.DeckPath) + assert.Equal(t, deckPath.String(), or.GetOutputMetadata(context.Background())[deckURIKey]) ctx := context.TODO() ok, err := or.IsError(ctx) diff --git a/go/tasks/pluginmachinery/ioutils/paths.go b/go/tasks/pluginmachinery/ioutils/paths.go index e50aa5484..5a62d43a6 100644 --- a/go/tasks/pluginmachinery/ioutils/paths.go +++ b/go/tasks/pluginmachinery/ioutils/paths.go @@ -30,6 +30,8 @@ const ( // CheckpointPrefix specifies the common prefix that can be used as a directory where all the checkpoint information // will be stored. This directory is under the raw output-prefix path CheckpointPrefix = "_flytecheckpoints" + + deckURIKey = "deck-uri" ) // ConstructCheckpointPath returns a checkpoint path under the given `base` / rawOutputPrefix, following the conventions of diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index ac31bd24e..39051a704 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -123,6 +123,10 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { return md.Exists(), nil } +func (r RemoteFileOutputReader) GetOutputMetadata(_ context.Context) map[string]string { + return map[string]string{deckURIKey: r.outPath.GetDeckPath().String()} +} + func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader { return RemoteFileOutputReader{ outPath: outPaths, diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go index ee10638b4..db213f36e 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go @@ -68,6 +68,7 @@ func TestReadOrigin(t *testing.T) { exists, err := r.DeckExists(ctx) assert.NoError(t, err) assert.True(t, exists) + assert.Equal(t, "deck.html", r.GetOutputMetadata(ctx)[deckURIKey]) }) t.Run("system", func(t *testing.T) { diff --git a/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go b/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go index 9a330d3e8..d0c51c798 100644 --- a/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go +++ b/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer --type=WorkStatus"; DO NOT EDIT. -// package workqueue import ( diff --git a/go/tasks/plugins/array/core/phase_enumer.go b/go/tasks/plugins/array/core/phase_enumer.go index b582c6fff..c659c7bcf 100644 --- a/go/tasks/plugins/array/core/phase_enumer.go +++ b/go/tasks/plugins/array/core/phase_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=Phase"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/plugins/presto/client/prestostatus_enumer.go b/go/tasks/plugins/presto/client/prestostatus_enumer.go index c5d206c76..33041cac3 100644 --- a/go/tasks/plugins/presto/client/prestostatus_enumer.go +++ b/go/tasks/plugins/presto/client/prestostatus_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer --type=PrestoStatus"; DO NOT EDIT. -// package client import (