Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add GetSpanPath and SpanExists #361

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/core/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (d dummyOutputPaths) GetDeckPath() storage.DataReference {
panic("should not be called")
}

func (d dummyOutputPaths) GetSpanPath() storage.DataReference {
panic("should not be called")
}

func (d dummyOutputPaths) GetPreviousCheckpointsPrefix() storage.DataReference {
return d.prevCheckpointPath
}
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/io/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type OutputReader interface {
Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error)
// DeckExists checks if the deck file has been generated.
DeckExists(ctx context.Context) (bool, error)
// SpanExists checks if the span file has been generated.
SpanExists(ctx context.Context) (bool, error)
}

// CheckpointPaths provides the paths / keys to input Checkpoints directory and an output checkpoints directory.
Expand Down Expand Up @@ -81,6 +83,8 @@ type OutputFilePaths interface {
GetOutputPath() storage.DataReference
// GetDeckPath returns a fully qualified path (URN) to where the framework expects the deck.html to exist in the configured storage backend
GetDeckPath() storage.DataReference
// GetSpanPath returns a fully qualified path (URN) to where the framework expects the span.html to exist in the configured storage backend
GetSpanPath() storage.DataReference
// GetErrorPath returns a fully qualified path (URN) where the error information should be placed as a protobuf core.ErrorDocument. It is not directly
// used by the framework, but could be used in the future
GetErrorPath() storage.DataReference
Expand Down
32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_file_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type InMemoryOutputReader struct {
literals *core.LiteralMap
DeckPath *storage.DataReference
SpanPath *storage.DataReference
err *io.ExecutionError
}

Expand Down Expand Up @@ -47,10 +48,15 @@ func (r InMemoryOutputReader) DeckExists(_ context.Context) (bool, error) {
return r.DeckPath != nil, nil
}

func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader {
func (r InMemoryOutputReader) SpanExists(_ context.Context) (bool, error) {
return r.SpanPath != nil, nil
}

func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, SpanPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader {
Copy link
Member

@pingsutw pingsutw Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a new function called NewInMemoryOutputReaderWithSpan? Spotify is using this function internally, so we can't change it. #268 (comment)

return InMemoryOutputReader{
literals: literals,
DeckPath: DeckPath,
SpanPath: SpanPath,
err: err,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

func TestInMemoryOutputReader(t *testing.T) {
deckPath := storage.DataReference("s3://bucket/key")
deckPath := storage.DataReference("s3://bucket/key/deck.html")
spanPath := storage.DataReference("s3://bucket/key/span.pb")
lt := map[string]*flyteIdlCore.Literal{
"results": {
Value: &flyteIdlCore.Literal_Scalar{
Expand All @@ -22,9 +23,10 @@ func TestInMemoryOutputReader(t *testing.T) {
},
},
}
or := NewInMemoryOutputReader(&flyteIdlCore.LiteralMap{Literals: lt}, &deckPath, nil)
or := NewInMemoryOutputReader(&flyteIdlCore.LiteralMap{Literals: lt}, &deckPath, &spanPath, nil)

assert.Equal(t, &deckPath, or.DeckPath)
assert.Equal(t, &spanPath, or.SpanPath)
ctx := context.TODO()

ok, err := or.IsError(ctx)
Expand Down
2 changes: 2 additions & 0 deletions go/tasks/pluginmachinery/ioutils/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
// deckSuffix specifies that deck file are assumed to be written to this "file"/"suffix" under the given prefix
// The deck file has a format of HTML
deckSuffix = "deck.html"
// spanSuffix specifies that span file are assumed to be written to this "file"/"suffix" under the given prefix
spanSuffix = "span.pb"
// ErrorsSuffix specifies that the errors are written to this prefix/file under the given prefix. The Error File
// has a format of core.ErrorDocument
ErrorsSuffix = "error.pb"
Expand Down
8 changes: 8 additions & 0 deletions go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) {
return md.Exists(), nil
}

func (r RemoteFileOutputReader) SpanExists(ctx context.Context) (bool, error) {
md, err := r.store.Head(ctx, r.outPath.GetSpanPath())
if err != nil {
return false, err
}
return md.Exists(), nil
}

func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
return RemoteFileOutputReader{
outPath: outPaths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func TestReadOrigin(t *testing.T) {
opath := &pluginsIOMock.OutputFilePaths{}
opath.OnGetErrorPath().Return("")
deckPath := "deck.html"
spanPath := "span.pb"
opath.OnGetDeckPath().Return(storage.DataReference(deckPath))
opath.OnGetSpanPath().Return(storage.DataReference(spanPath))

t.Run("user", func(t *testing.T) {
errorDoc := &core.ErrorDocument{
Expand All @@ -51,7 +53,10 @@ func TestReadOrigin(t *testing.T) {
casted.Error = errorDoc.Error
}).Return(nil)

store.OnHead(ctx, storage.DataReference("deck.html")).Return(MemoryMetadata{
store.OnHead(ctx, storage.DataReference(deckPath)).Return(MemoryMetadata{
exists: true,
}, nil)
store.OnHead(ctx, storage.DataReference(spanPath)).Return(MemoryMetadata{
exists: true,
}, nil)

Expand All @@ -68,6 +73,9 @@ func TestReadOrigin(t *testing.T) {
exists, err := r.DeckExists(ctx)
assert.NoError(t, err)
assert.True(t, exists)
exists, err = r.SpanExists(ctx)
assert.NoError(t, err)
assert.True(t, exists)
})

t.Run("system", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (w RemoteFileOutputPaths) GetDeckPath() storage.DataReference {
return constructPath(w.store, w.outputPrefix, deckSuffix)
}

func (w RemoteFileOutputPaths) GetSpanPath() storage.DataReference {
return constructPath(w.store, w.outputPrefix, spanSuffix)
}

func (w RemoteFileOutputPaths) GetErrorPath() storage.DataReference {
return constructPath(w.store, w.outputPrefix, ErrorsSuffix)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestRemoteFileOutputWriter(t *testing.T) {
assert.Equal(t, constructPath(memStore, rawOutputPrefix, CheckpointPrefix), checkpointPath.GetCheckpointPrefix())
assert.Equal(t, constructPath(memStore, outputPrefix, OutputsSuffix), checkpointPath.GetOutputPath())
assert.Equal(t, constructPath(memStore, outputPrefix, deckSuffix), checkpointPath.GetDeckPath())
assert.Equal(t, constructPath(memStore, outputPrefix, spanSuffix), checkpointPath.GetSpanPath())
assert.Equal(t, constructPath(memStore, outputPrefix, ErrorsSuffix), checkpointPath.GetErrorPath())
assert.Equal(t, constructPath(memStore, outputPrefix, FuturesSuffix), checkpointPath.GetFuturesPath())
})
Expand All @@ -43,6 +44,7 @@ func TestRemoteFileOutputWriter(t *testing.T) {
assert.Equal(t, constructPath(memStore, rawOutputPrefix, CheckpointPrefix), p.GetCheckpointPrefix())
assert.Equal(t, constructPath(memStore, outputPrefix, OutputsSuffix), p.GetOutputPath())
assert.Equal(t, constructPath(memStore, outputPrefix, deckSuffix), p.GetDeckPath())
assert.Equal(t, constructPath(memStore, outputPrefix, spanSuffix), p.GetSpanPath())
assert.Equal(t, constructPath(memStore, outputPrefix, ErrorsSuffix), p.GetErrorPath())
})
}
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, job
return nil, err
}

if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, nil, &io.ExecutionError{
ExecutionError: &core2.ExecutionError{
Code: "",
Message: subJob.Status.Message,
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
return currentState, externalResources, err
}

if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, nil, &io.ExecutionError{
ExecutionError: phaseInfo.Err(),
IsRecoverable: phaseInfo.Phase() != core.PhasePermanentFailure,
})); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/plugins/array/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (w assembleOutputsWorker) Process(ctx context.Context, workItem workqueue.W
}

ow := ioutils.NewRemoteFileOutputWriter(ctx, i.dataStore, i.outputPaths)
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(finalOutputs, nil, nil)); err != nil {
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(finalOutputs, nil, nil, nil)); err != nil {
return workqueue.WorkStatusNotDone, err
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (a assembleErrorsWorker) Process(ctx context.Context, workItem workqueue.Wo
}

ow := ioutils.NewRemoteFileOutputWriter(ctx, w.dataStore, w.outputPaths)
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, nil, &io.ExecutionError{
ExecutionError: &core.ExecutionError{
Code: "",
Message: msg,
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentSt
},
},
},
}, nil, nil))
}, nil, nil, nil))
if err != nil {
logger.Errorf(ctx, "Error writing outputs file: [%s]", err)
return currentState, err
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/sagemaker/builtin_training.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (m awsSagemakerPlugin) getTaskPhaseForTrainingJob(
return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "failed to create outputs for the task")
}
// Instantiate a output reader with the literal map, and write the output to the remote location referred to by the OutputWriter
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(outputLiteralMap, nil, nil)); err != nil {
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(outputLiteralMap, nil, nil, nil)); err != nil {
return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "Unable to write output to the remote location")
}
logger.Debugf(ctx, "Successfully produced and returned outputs")
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (m awsSagemakerPlugin) getTaskPhaseForHyperparameterTuningJob(
logger.Errorf(ctx, "Failed to create outputs, err: %s", err)
return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "failed to create outputs for the task")
}
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(out, nil, nil)); err != nil {
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(out, nil, nil, nil)); err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
logger.Debugf(ctx, "Successfully produced and returned outputs")
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func writeOutput(ctx context.Context, tCtx core.TaskExecutionContext, externalLo
},
},
},
}, nil, nil))
}, nil, nil, nil))
}

// The 'PhaseInfoRunning' occurs 15 times (3 for each of the 5 Presto queries that get run for every Presto task) which
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
return core.PhaseInfoRetryableFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil
case admin.State_SUCCEEDED:
if resource.Outputs != nil {
err := taskCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil))
err := taskCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil, nil))
if err != nil {
return core.PhaseInfoUndefined, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/athena/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func writeOutput(ctx context.Context, tCtx webapi.StatusContext, externalLocatio
},
},
},
}, nil, nil))
}, nil, nil, nil))
}

type QueryInfo struct {
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/athena/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test_writeOutput(t *testing.T) {
},
},
},
}, nil, nil)).Return(nil)
}, nil, nil, nil)).Return(nil)
statusContext.OnOutputWriter().Return(ow)

err = writeOutput(context.Background(), statusContext, externalLocation)
Expand Down
5 changes: 4 additions & 1 deletion go/tasks/plugins/webapi/bigquery/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading