Skip to content

Commit

Permalink
feat; Catalog Information including Caching published to FlyteAdmin (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Jul 29, 2020
1 parent 1390758 commit 226d2ff
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 194 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ require (
github.com/imdario/mergo v0.3.8 // indirect
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.34
github.com/lyft/flyteplugins v0.3.35
github.com/lyft/flyteidl v0.17.38
github.com/lyft/flyteplugins v0.3.38
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,23 @@ github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U=
github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.35-0.20200707003420-954dedb491fa/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce h1:0yFcmwunllOOdjW8d7+BA6fwQzNYbzrefbbh3dfTHcg=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg=
github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.34/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
github.com/lyft/flyteplugins v0.3.35 h1:9s2BrJ82RoTJa1Cy02vqQy+ajxS+d4MQAkuUFoaiCuQ=
github.com/lyft/flyteplugins v0.3.35/go.mod h1:Dk9rnPCbgR7rC9dNM49260TQ51TvRsljDBJ6uBjZ9ys=
github.com/lyft/flyteplugins v0.3.38-0.20200724213302-8d841a7bfa4b h1:Pr2E7TQjFeZHQyE5WhEioCH5NLg80H290pNg+1lLmq4=
github.com/lyft/flyteplugins v0.3.38-0.20200724213302-8d841a7bfa4b/go.mod h1:RPAS1gST3UAp+X5i2sMavkQBAMSXBoGbh2THiJoJCQc=
github.com/lyft/flyteplugins v0.3.38-0.20200724215013-9d75d41ef924 h1:wVBgHarQ6aUIyPLyuQWN+kRIz7SW07SXCCV06fbIJb8=
github.com/lyft/flyteplugins v0.3.38-0.20200724215013-9d75d41ef924/go.mod h1:RPAS1gST3UAp+X5i2sMavkQBAMSXBoGbh2THiJoJCQc=
github.com/lyft/flyteplugins v0.3.38 h1:3yDKZ9raVV6oHBY+enRhw0CbdL1gD3KEbqEaDUlFbkU=
github.com/lyft/flyteplugins v0.3.38/go.mod h1:Vb5ZJ9uq1u3tQbAGGHOFsrj08pvS+ukpMtLAl99HAEI=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.7/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down Expand Up @@ -33,7 +34,7 @@ const dynamicNodeID = "dynamic-node"
type TaskNodeHandler interface {
handler.Node
ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter,
tr pluginCore.TaskReader, m catalog.Metadata) (*io.ExecutionError, error)
tr pluginCore.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error)
}

type metrics struct {
Expand Down Expand Up @@ -115,7 +116,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil)
execID := task.GetTaskExecutionIdentifier(nCtx)
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
status, ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
TaskExecutionIdentifier: execID,
})

Expand All @@ -130,6 +131,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n

return trns.WithInfo(handler.PhaseInfoFailureErr(ee.ExecutionError, trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: ee.ExecutionError.String()}, nil
}
trns.WithInfo(trns.Info().WithInfo(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{TaskNodeMetadata: &event.TaskNodeMetadata{CacheStatus: status.GetCacheStatus(), CatalogKey: status.GetMetadata()}}}))
}

return trns, newState, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

Expand Down Expand Up @@ -542,9 +543,9 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {
mockLPLauncher := &lpMocks.Reader{}
h := &mocks.TaskNodeHandler{}
if tt.args.validErr != nil {
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.args.validErr, nil)
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), tt.args.validErr, nil)
} else {
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, &core.CatalogMetadata{ArtifactTag: &core.CatalogArtifactTag{Name: "name", ArtifactId: "id"}}), nil, nil)
}
n := &executorMocks.Node{}
if tt.args.isErr {
Expand Down
31 changes: 19 additions & 12 deletions pkg/controller/nodes/dynamic/mocks/task_node_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flytestdlib/storage"
)

Expand Down Expand Up @@ -41,9 +42,9 @@ type WorkflowNodeInfo struct {
type BranchNodeInfo struct {
}

// Carries any information that should be sent as part of NodeEvents
type TaskNodeInfo struct {
CacheHit bool
// TaskPhase etc
TaskNodeMetadata *event.TaskNodeMetadata
}

type OutputInfo struct {
Expand Down Expand Up @@ -86,6 +87,16 @@ func (p PhaseInfo) GetReason() string {
return p.reason
}

func (p PhaseInfo) WithInfo(i *ExecutionInfo) PhaseInfo {
return PhaseInfo{
p: p.p,
occurredAt: p.occurredAt,
err: p.err,
info: i,
reason: p.reason,
}
}

var PhaseInfoUndefined = PhaseInfo{p: EPhaseUndefined}

func phaseInfo(p EPhase, err *core.ExecutionError, info *ExecutionInfo, reason string) PhaseInfo {
Expand Down
59 changes: 26 additions & 33 deletions pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
)

const (
taskVersionKey = "task-version"
wfExecNameKey = "execution-name"
)

var (
_ catalog.Client = &CatalogClient{}
)
Expand Down Expand Up @@ -95,43 +90,50 @@ func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, da
// - Verify there is a Dataset created for the Task
// - Lookup the Artifact that is tagged with the hash of the input values
// - The artifactData contains the literal values that serve as the task outputs
func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (io.OutputReader, error) {
func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error) {
dataset, err := m.GetDataset(ctx, key)
if err != nil {
logger.Debugf(ctx, "DataCatalog failed to get dataset for ID %s, err: %+v", key.Identifier.String(), err)
return nil, errors.Wrapf(err, "DataCatalog failed to get dataset for ID %s", key.Identifier.String())
return catalog.Entry{}, errors.Wrapf(err, "DataCatalog failed to get dataset for ID %s", key.Identifier.String())
}

inputs := &core.LiteralMap{}
if key.TypedInterface.Inputs != nil {
retInputs, err := key.InputReader.Get(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to read inputs when trying to query catalog")
return catalog.Entry{}, errors.Wrap(err, "failed to read inputs when trying to query catalog")
}
inputs = retInputs
}

tag, err := GenerateArtifactTagName(ctx, inputs)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to generate tag for inputs %+v, err: %+v", inputs, err)
return nil, err
return catalog.Entry{}, err
}

artifact, err := m.GetArtifactByTag(ctx, tag, dataset)
if err != nil {
logger.Debugf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err)
return nil, err
return catalog.Entry{}, err
}
logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag)

var relevantTag *datacatalog.Tag
if len(artifact.GetTags()) > 0 {
// TODO should we look through all the tags to find the relevant one?
relevantTag = artifact.GetTags()[0]
}
md := EventCatalogMetadata(dataset.GetId(), relevantTag, GetSourceFromMetadata(dataset.GetMetadata(), artifact.GetMetadata(), key.Identifier))

outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err)
return nil, err
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err
}

logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag)
return ioutils.NewInMemoryOutputReader(outputs, nil), nil
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil
}

func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) {
Expand Down Expand Up @@ -195,21 +197,12 @@ func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatal
// - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task
// - Create an Artifact with the execution data that belongs to the dataset
// - Tag the Artifact with a hash generated by the input values
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) error {

// Try creating the dataset in case it doesn't exist
md := &datacatalog.Metadata{
KeyMap: map[string]string{
taskVersionKey: key.Identifier.Version,
},
}
if metadata.WorkflowExecutionIdentifier != nil {
md.KeyMap[wfExecNameKey] = metadata.WorkflowExecutionIdentifier.Name
}
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) {

datasetID, err := m.CreateDataset(ctx, key, md)
// Populate Metadata for later recovery
datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier))
if err != nil {
return err
return catalog.Status{}, err
}

inputs := &core.LiteralMap{}
Expand All @@ -218,7 +211,7 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
retInputs, err := key.InputReader.Get(ctx)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to read inputs err: %s", err)
return err
return catalog.Status{}, err
}
logger.Debugf(ctx, "DataCatalog read inputs")
inputs = retInputs
Expand All @@ -228,27 +221,27 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
retOutputs, retErr, err := reader.Read(ctx)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to read outputs err: %s", err)
return err
return catalog.Status{}, err
}
if retErr != nil {
logger.Errorf(ctx, "DataCatalog failed to read outputs, err :%s", retErr.Message)
return errors.Errorf("Failed to read outputs. EC: %s, Msg: %s", retErr.Code, retErr.Message)
return catalog.Status{}, errors.Errorf("Failed to read outputs. EC: %s, Msg: %s", retErr.Code, retErr.Message)
}
logger.Debugf(ctx, "DataCatalog read outputs")
outputs = retOutputs
}

// Create the artifact for the execution that belongs in the task
cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, md)
cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier))
if err != nil {
return errors.Wrapf(err, "failed to create dataset for ID %s", key.Identifier.String())
return catalog.Status{}, errors.Wrapf(err, "failed to create dataset for ID %s", key.Identifier.String())
}

// Tag the artifact since it is the cached artifact
tagName, err := GenerateArtifactTagName(ctx, inputs)
if err != nil {
logger.Errorf(ctx, "Failed to generate tag for artifact %+v, err: %+v", cachedArtifact.Id, err)
return err
return catalog.Status{}, err
}
logger.Infof(ctx, "Cached exec tag: %v, task: %v", tagName, key.Identifier)

Expand All @@ -264,11 +257,11 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
logger.Warnf(ctx, "Tag %v already exists for Artifact %v (idempotent)", tagName, cachedArtifact.Id)
} else {
logger.Errorf(ctx, "Failed to add tag %+v for artifact %+v, err: %+v", tagName, cachedArtifact.Id, err)
return err
return catalog.Status{}, err
}
}

return nil
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil
}

// Create a new Datacatalog client for task execution caching
Expand Down
Loading

0 comments on commit 226d2ff

Please sign in to comment.