Skip to content

Commit

Permalink
feat(tekton-kfptask): Update kfptask to publish completed dag status (#…
Browse files Browse the repository at this point in the history
…1426)

* update kfptask to update completed dag

* update comment

* update error message
  • Loading branch information
Tomcli authored Dec 7, 2023
1 parent 191ad39 commit eaa2de6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
4 changes: 2 additions & 2 deletions tekton-catalog/tekton-kfptask/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ require (
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
k8s.io/api v0.27.1
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2
Expand Down Expand Up @@ -64,7 +66,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
3 changes: 2 additions & 1 deletion tekton-catalog/tekton-kfptask/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions tekton-catalog/tekton-kfptask/pkg/reconciler/kfptask/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
Expand All @@ -43,6 +44,7 @@ import (
listeners "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1"
listenersv1beta1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -462,6 +464,24 @@ func v1ParamsConversion(ctx context.Context, v1beta1Params tektonv1beta1.Params)
return v1Params
}

func DAGPublisher(ctx context.Context, opts driver.Options, mlmd *metadata.Client) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to publish driver DAG execution %s: %w", fmt.Sprint(opts.DAGExecutionID), err)
}
}()
var outputParameters map[string]*structpb.Value
status := pb.Execution_COMPLETE
execution, err := mlmd.GetExecution(ctx, opts.DAGExecutionID)
if err != nil {
return fmt.Errorf("failed to get execution: %w", err)
}
if err = mlmd.PublishExecution(ctx, execution, outputParameters, nil, status); err != nil {
return fmt.Errorf("failed to publish: %w", err)
}
return nil
}

func execDriver(ctx context.Context, options *driverOptions) (*[]tektonv1beta1.CustomRunResult, bool, string, string, string, error) {
var execution *driver.Execution
var err error
Expand All @@ -479,8 +499,9 @@ func execDriver(ctx context.Context, options *driverOptions) (*[]tektonv1beta1.C
case "DAG":
execution, err = driver.DAG(ctx, options.options, options.mlmdClient)
case "DAG_PUB":
// no-op for now
return &[]tektonv1beta1.CustomRunResult{}, taskRunDecision, executionID, executorInput, podSpecPatch, nil
// current DAG_PUB only scheduled when the dag execution is completed
err = DAGPublisher(ctx, options.options, options.mlmdClient)
return &[]tektonv1beta1.CustomRunResult{}, taskRunDecision, executionID, executorInput, podSpecPatch, err
default:
err = fmt.Errorf("unknown driverType %s", options.driverType)
}
Expand Down

0 comments on commit eaa2de6

Please sign in to comment.