Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add GetDeckPath to OutputReader #268

Merged
merged 8 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/core/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type dummyOutputPaths struct {
checkpointPath storage.DataReference
}

func (d dummyOutputPaths) GetDeckPath() storage.DataReference {
panic("should not be called")
}

func (d dummyOutputPaths) GetPreviousCheckpointsPrefix() storage.DataReference {
return d.prevCheckpointPath
}
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/io/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type OutputReader interface {
Exists(ctx context.Context) (bool, error)
// Read returns the output -> *core.LiteralMap (nil if void), *ExecutionError if user error when reading the output and error to indicate system problems
Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error)
// GetDeckPath returns a fully qualified path (URN) of deck file.
GetDeckPath() *storage.DataReference
}

// CheckpointPaths provides the paths / keys to input Checkpoints directory and an output checkpoints directory.
Expand Down Expand Up @@ -77,6 +79,8 @@ type OutputFilePaths interface {
GetOutputPrefixPath() storage.DataReference
// GetOutputPath returns a fully qualified path (URN) to where the framework expects the output to exist in the configured storage backend
GetOutputPath() storage.DataReference
// GetDeckPath returns a fully qualified path (URN) to where the framework expects the deck.html to exist in the configured storage backend
GetDeckPath() storage.DataReference
// GetErrorPath returns a fully qualified path (URN) where the error information should be placed as a protobuf core.ErrorDocument. It is not directly
// used by the framework, but could be used in the future
GetErrorPath() storage.DataReference
Expand Down
32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_file_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/io/mocks/output_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"fmt"

"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
)

type InMemoryOutputReader struct {
literals *core.LiteralMap
DeckPath *storage.DataReference
err *io.ExecutionError
}

Expand Down Expand Up @@ -40,9 +43,14 @@ func (r InMemoryOutputReader) Read(ctx context.Context) (*core.LiteralMap, *io.E
return r.literals, r.err, nil
}

func NewInMemoryOutputReader(literals *core.LiteralMap, err *io.ExecutionError) InMemoryOutputReader {
func (r InMemoryOutputReader) GetDeckPath() *storage.DataReference {
return r.DeckPath
}

func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader {
return InMemoryOutputReader{
literals: literals,
DeckPath: DeckPath,
err: err,
}
}
44 changes: 44 additions & 0 deletions go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ioutils

import (
"context"
"testing"

flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/storage"
"github.com/stretchr/testify/assert"
)

func TestInMemoryOutputReader(t *testing.T) {
deckPath := storage.DataReference("s3://bucket/key")
lt := map[string]*flyteIdlCore.Literal{
"results": {
Value: &flyteIdlCore.Literal_Scalar{
Scalar: &flyteIdlCore.Scalar{
Value: &flyteIdlCore.Scalar_Primitive{
Primitive: &flyteIdlCore.Primitive{Value: &flyteIdlCore.Primitive_Integer{Integer: 3}},
},
},
},
},
}
or := NewInMemoryOutputReader(&flyteIdlCore.LiteralMap{Literals: lt}, &deckPath, nil)

assert.Equal(t, &deckPath, or.GetDeckPath())
ctx := context.TODO()

ok, err := or.IsError(ctx)
assert.False(t, ok)
assert.NoError(t, err)

assert.False(t, or.IsFile(ctx))

ok, err = or.Exists(ctx)
assert.True(t, ok)
assert.NoError(t, err)

literalMap, executionErr, err := or.Read(ctx)
assert.Equal(t, lt, literalMap.Literals)
assert.Nil(t, executionErr)
assert.NoError(t, err)
}
3 changes: 3 additions & 0 deletions go/tasks/pluginmachinery/ioutils/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (
// OutputsSuffix specifies that outputs are assumed to be written to this "file"/"suffix" under the given prefix
// The outputs file has a format of core.LiteralMap
OutputsSuffix = "outputs.pb"
// deckSuffix specifies that deck file are assumed to be written to this "file"/"suffix" under the given prefix
// The deck file has a format of HTML
deckSuffix = "deck.html"
// ErrorsSuffix specifies that the errors are written to this prefix/file under the given prefix. The Error File
// has a format of core.ErrorDocument
ErrorsSuffix = "error.pb"
Expand Down
5 changes: 5 additions & 0 deletions go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (r RemoteFileOutputReader) IsFile(ctx context.Context) bool {
return true
}

func (r RemoteFileOutputReader) GetDeckPath() *storage.DataReference {
path := r.outPath.GetDeckPath()
return &path
}

func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
return RemoteFileOutputReader{
outPath: outPaths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
pluginsIOMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"
storageMocks "github.com/flyteorg/flytestdlib/storage/mocks"
Expand All @@ -16,6 +18,8 @@ func TestReadOrigin(t *testing.T) {

opath := &pluginsIOMock.OutputFilePaths{}
opath.OnGetErrorPath().Return("")
deckPath := "deck.html"
opath.OnGetDeckPath().Return(storage.DataReference(deckPath))

t.Run("user", func(t *testing.T) {
errorDoc := &core.ErrorDocument{
Expand Down Expand Up @@ -44,6 +48,7 @@ func TestReadOrigin(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, core.ExecutionError_USER, ee.Kind)
assert.False(t, ee.IsRecoverable)
assert.Equal(t, deckPath, r.GetDeckPath().String())
})

t.Run("system", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/ioutils/remote_file_output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (w RemoteFileOutputPaths) GetOutputPath() storage.DataReference {
return constructPath(w.store, w.outputPrefix, OutputsSuffix)
}

func (w RemoteFileOutputPaths) GetDeckPath() storage.DataReference {
return constructPath(w.store, w.outputPrefix, deckSuffix)
}

func (w RemoteFileOutputPaths) GetErrorPath() storage.DataReference {
return constructPath(w.store, w.outputPrefix, ErrorsSuffix)
}
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata
return nil, err
}

if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{
ExecutionError: &core2.ExecutionError{
Code: "",
Message: subJob.Status.Message,
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
return currentState, externalResources, err
}

if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{
ExecutionError: phaseInfo.Err(),
IsRecoverable: phaseInfo.Phase() != core.PhasePermanentFailure,
})); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/plugins/array/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (w assembleOutputsWorker) Process(ctx context.Context, workItem workqueue.W
}

ow := ioutils.NewRemoteFileOutputWriter(ctx, i.dataStore, i.outputPaths)
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(finalOutputs, nil)); err != nil {
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(finalOutputs, nil, nil)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to think about this (maybe open a separate issue) for how to aggregate (if any) these html files in array case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, will create a new issue for it.

return workqueue.WorkStatusNotDone, err
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (a assembleErrorsWorker) Process(ctx context.Context, workItem workqueue.Wo
}

ow := ioutils.NewRemoteFileOutputWriter(ctx, w.dataStore, w.outputPaths)
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{
if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, nil, &io.ExecutionError{
ExecutionError: &core.ExecutionError{
Code: "",
Message: msg,
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentSt
},
},
},
}, nil))
}, nil, nil))
if err != nil {
logger.Errorf(ctx, "Error writing outputs file: [%s]", err)
return currentState, err
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/sagemaker/builtin_training.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (m awsSagemakerPlugin) getTaskPhaseForTrainingJob(
return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "failed to create outputs for the task")
}
// Instantiate a output reader with the literal map, and write the output to the remote location referred to by the OutputWriter
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(outputLiteralMap, nil)); err != nil {
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(outputLiteralMap, nil, nil)); err != nil {
return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "Unable to write output to the remote location")
}
logger.Debugf(ctx, "Successfully produced and returned outputs")
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (m awsSagemakerPlugin) getTaskPhaseForHyperparameterTuningJob(
logger.Errorf(ctx, "Failed to create outputs, err: %s", err)
return pluginsCore.PhaseInfoUndefined, pluginErrors.Wrapf(pluginErrors.BadTaskSpecification, err, "failed to create outputs for the task")
}
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(out, nil)); err != nil {
if err := pluginContext.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(out, nil, nil)); err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
logger.Debugf(ctx, "Successfully produced and returned outputs")
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func writeOutput(ctx context.Context, tCtx core.TaskExecutionContext, externalLo
},
},
},
}, nil))
}, nil, nil))
}

// The 'PhaseInfoRunning' occurs 15 times (3 for each of the 5 Presto queries that get run for every Presto task) which
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/athena/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func writeOutput(ctx context.Context, tCtx webapi.StatusContext, externalLocatio
},
},
},
}, nil))
}, nil, nil))
}

type QueryInfo struct {
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/athena/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test_writeOutput(t *testing.T) {
},
},
},
}, nil)).Return(nil)
}, nil, nil)).Return(nil)
statusContext.OnOutputWriter().Return(ow)

err = writeOutput(context.Background(), statusContext, externalLocation)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/bigquery/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func writeOutput(ctx context.Context, tCtx webapi.StatusContext, OutputLocation
},
},
},
}, nil))
}, nil, nil))
}

func handleCreateError(createError *googleapi.Error, taskInfo *core.TaskInfo) core.PhaseInfo {
Expand Down