diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index d14ddffdaeb..9a39e6f8f35 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -77,9 +77,22 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } } + var PipelineOptions argocompiler.Options + for key, platform := range t.platformSpec.Platforms { + if key == "kubernetes" && platform != nil && platform.PipelineConfig != nil { + if platform.PipelineConfig.SemaphoreKey != "" { + PipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey + } + if platform.PipelineConfig.MutexName != "" { + PipelineOptions.MutexName = platform.PipelineConfig.MutexName + } + break + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, &PipelineOptions) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher}) } @@ -300,9 +313,22 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u } } + var PipelineOptions *argocompiler.Options + for key, platform := range t.platformSpec.Platforms { + if key == "kubernetes" && platform != nil && platform.PipelineConfig != nil { + if platform.PipelineConfig.SemaphoreKey != "" { + PipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey + } + if platform.PipelineConfig.MutexName != "" { + PipelineOptions.MutexName = platform.PipelineConfig.MutexName + } + break + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, PipelineOptions) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil) } diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index 1f1c19ed3ec..e2df35f45d5 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -28,6 +28,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" k8score "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8sres "k8s.io/apimachinery/pkg/api/resource" k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -40,6 +41,8 @@ type Options struct { // optional PipelineRoot string // TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode. + SemaphoreKey string + MutexName string } func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) { @@ -76,6 +79,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S } } + var semaphoreKey, mutexName string + if opts != nil && opts.SemaphoreKey != "" { + semaphoreKey = opts.SemaphoreKey + } + if opts != nil && opts.MutexName != "" { + mutexName = opts.MutexName + } + var kubernetesSpec *pipelinespec.SinglePlatformSpec if kubernetesSpecArg != nil { // clone kubernetesSpecArg, because we don't want to change it @@ -94,13 +105,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ObjectMeta: k8smeta.ObjectMeta{ GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-", - // Note, uncomment the following during development to view argo inputs/outputs in KFP UI. - // TODO(Bobgy): figure out what annotations we should use for v2 engine. - // For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts - // suitable for debugging. - // + // Uncomment during development for better debugging in KFP UI // Annotations: map[string]string{ - // "pipelines.kubeflow.org/v2_pipeline": "true", + // "pipelines.kubeflow.org/v2_pipeline": "true", // }, }, Spec: wfapi.WorkflowSpec{ @@ -117,8 +124,22 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, + Synchronization: &wfapi.Synchronization{ + Semaphore: &wfapi.SemaphoreRef{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "semaphore-config", + }, + Key: semaphoreKey, + }, + }, + Mutex: &wfapi.Mutex{ + Name: mutexName, + }, + }, }, } + c := &workflowCompiler{ wf: wf, templates: make(map[string]*wfapi.Template), diff --git a/backend/src/v2/compiler/visitor.go b/backend/src/v2/compiler/visitor.go index f6b7204a45c..277fadcbec8 100644 --- a/backend/src/v2/compiler/visitor.go +++ b/backend/src/v2/compiler/visitor.go @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec } // Add kubernetes spec to annotation - if state.kubernetesSpec != nil { + if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil { kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel] if ok { state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)