From b5e29b384e8f296abf3dec64482c354fb8f3a889 Mon Sep 17 00:00:00 2001 From: ddalvi Date: Mon, 11 Nov 2024 23:45:11 -0500 Subject: [PATCH] feat(backend): Add Semaphore and Mutex fields to Workflow CR - Added `Semaphore` and `Mutex` fields to the Workflow Spec to support concurrency control mechanisms directly within workflows. - Introduced a new environment variable, `SEMAPHORE_CONFIGMAP_NAME`, to the API Server deployment for managing semaphore configurations. - Added an empty ConfigMap manifest for semaphores to facilitate initial setup and testing. Signed-off-by: ddalvi --- backend/src/apiserver/template/v2_template.go | 51 +++++++++++++------ backend/src/v2/compiler/argocompiler/argo.go | 42 +++++++++++---- backend/src/v2/compiler/visitor.go | 2 +- .../base/pipeline/kustomization.yaml | 1 + .../ml-pipeline-apiserver-deployment.yaml | 2 + .../ml-pipeline-semaphore-configmap.yaml | 29 +++++++++++ 6 files changed, 101 insertions(+), 26 deletions(-) create mode 100644 manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index d14ddffdaeb..32536912182 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -45,6 +45,34 @@ var ( Launcher = "" ) +func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) *pipelinespec.SinglePlatformSpec { + var kubernetesSpec *pipelinespec.SinglePlatformSpec + + // Check for "kubernetes" key in the platformSpec map + if platformSpec != nil { + if platform, ok := platformSpec["kubernetes"]; ok && platform != nil { + kubernetesSpec = platform + } + } + return kubernetesSpec +} + +func getPipelineOptions(platform *pipelinespec.SinglePlatformSpec) *argocompiler.Options { + var pipelineOptions *argocompiler.Options + + if platform != nil && platform.PipelineConfig != nil { + pipelineOptions = &argocompiler.Options{} + if platform.PipelineConfig.SemaphoreKey != "" { + pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey + } + if platform.PipelineConfig.MutexName != "" { + pipelineOptions.MutexName = platform.PipelineConfig.MutexName + } + } + return pipelineOptions +} + + // Converts modelJob to ScheduledWorkflow. func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { job := &pipelinespec.PipelineJob{} @@ -69,17 +97,12 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche return nil, util.Wrap(err, "invalid pipeline job inputs") } - // Pick out Kubernetes platform configs - var kubernetesSpec *pipelinespec.SinglePlatformSpec - if t.platformSpec != nil { - if _, ok := t.platformSpec.Platforms["kubernetes"]; ok { - kubernetesSpec = t.platformSpec.Platforms["kubernetes"] - } - } + kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms) + pipelineOptions := getPipelineOptions(kubernetesSpec) var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher}) } @@ -292,17 +315,13 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u if err = t.validatePipelineJobInputs(job); err != nil { return nil, util.Wrap(err, "invalid pipeline job inputs") } - // Pick out Kubernetes platform configs - var kubernetesSpec *pipelinespec.SinglePlatformSpec - if t.platformSpec != nil { - if _, ok := t.platformSpec.Platforms["kubernetes"]; ok { - kubernetesSpec = t.platformSpec.Platforms["kubernetes"] - } - } + + kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms) + pipelineOptions := getPipelineOptions(kubernetesSpec) var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil) } diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index 1f1c19ed3ec..73181a6856e 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "strings" + "os" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" @@ -40,6 +41,16 @@ type Options struct { // optional PipelineRoot string // TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode. + SemaphoreKey string + MutexName string +} + +func getSemaphoreConfigMapName() string { + const defaultConfigMapName = "semaphore-config" + if name := os.Getenv("SEMAPHORE_CONFIGMAP_NAME"); name != "" { + return name + } + return defaultConfigMapName } func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) { @@ -86,7 +97,6 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S } } - // initialization wf := &wfapi.Workflow{ TypeMeta: k8smeta.TypeMeta{ APIVersion: "argoproj.io/v1alpha1", @@ -94,14 +104,6 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ObjectMeta: k8smeta.ObjectMeta{ GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-", - // Note, uncomment the following during development to view argo inputs/outputs in KFP UI. - // TODO(Bobgy): figure out what annotations we should use for v2 engine. - // For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts - // suitable for debugging. - // - // Annotations: map[string]string{ - // "pipelines.kubeflow.org/v2_pipeline": "true", - // }, }, Spec: wfapi.WorkflowSpec{ PodMetadata: &wfapi.Metadata{ @@ -119,6 +121,28 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S Entrypoint: tmplEntrypoint, }, } + + if opts != nil && opts.SemaphoreKey != "" { + wf.Spec.Synchronization = &wfapi.Synchronization{ + Semaphore: &wfapi.SemaphoreRef{ + ConfigMapKeyRef: &k8score.ConfigMapKeySelector{ + LocalObjectReference: k8score.LocalObjectReference{ + Name: getSemaphoreConfigMapName(), + }, + Key: opts.SemaphoreKey, + }, + }, + } + } + + if opts != nil && opts.MutexName != "" { + wf.Spec.Synchronization = &wfapi.Synchronization{ + Mutex: &wfapi.Mutex{ + Name: opts.MutexName, + }, + } + } + c := &workflowCompiler{ wf: wf, templates: make(map[string]*wfapi.Template), diff --git a/backend/src/v2/compiler/visitor.go b/backend/src/v2/compiler/visitor.go index f6b7204a45c..277fadcbec8 100644 --- a/backend/src/v2/compiler/visitor.go +++ b/backend/src/v2/compiler/visitor.go @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec } // Add kubernetes spec to annotation - if state.kubernetesSpec != nil { + if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil { kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel] if ok { state.visitor.AddKubernetesSpec(name, kubernetesExecSpec) diff --git a/manifests/kustomize/base/pipeline/kustomization.yaml b/manifests/kustomize/base/pipeline/kustomization.yaml index ce93725b61d..eb3ea3c394c 100644 --- a/manifests/kustomize/base/pipeline/kustomization.yaml +++ b/manifests/kustomize/base/pipeline/kustomization.yaml @@ -15,6 +15,7 @@ resources: - ml-pipeline-scheduledworkflow-role.yaml - ml-pipeline-scheduledworkflow-rolebinding.yaml - ml-pipeline-scheduledworkflow-sa.yaml + - ml-pipeline-semaphore-configmap.yaml - ml-pipeline-ui-deployment.yaml - ml-pipeline-ui-configmap.yaml - ml-pipeline-ui-role.yaml diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml index cd80133596f..87e0026a798 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml @@ -109,6 +109,8 @@ spec: secretKeyRef: name: mlpipeline-minio-artifact key: secretkey + - name: SEMAPHORE_CONFIGMAP_NAME + value: "semaphore-config" image: gcr.io/ml-pipeline/api-server:dummy imagePullPolicy: IfNotPresent name: ml-pipeline-api-server diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml new file mode 100644 index 00000000000..c7c0e01ee4f --- /dev/null +++ b/manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml @@ -0,0 +1,29 @@ +kind: ConfigMap +apiVersion: v1 +metadata: + name: semaphore-config +data: {} +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: semaphore-configmap-init + namespace: kubeflow +spec: + template: + spec: + containers: + - name: create-configmap + image: bitnami/kubectl:latest + command: + - /bin/sh + - -c + - | + if ! kubectl get configmap semaphore-config -n kubeflow > /dev/null 2>&1; then + echo "Creating semaphore-config ConfigMap..." + kubectl create configmap semaphore-config -n kubeflow --from-literal=init="" + else + echo "ConfigMap semaphore-config already exists. Skipping creation." + fi + restartPolicy: OnFailure + backoffLimit: 3