Skip to content

Commit

Permalink
feat(backend)Add Semaphore and Mutex fields to Workflow Spec
Browse files Browse the repository at this point in the history
Signed-off-by: ddalvi <ddalvi@redhat.com>
  • Loading branch information
DharmitD committed Nov 18, 2024
1 parent 60a8865 commit 497bc6b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
30 changes: 28 additions & 2 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,22 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
}
}

var PipelineOptions argocompiler.Options
for key, platform := range t.platformSpec.Platforms {
if key == "kubernetes" && platform != nil && platform.PipelineConfig != nil {

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
if platform.PipelineConfig.SemaphoreKey != "" {

Check failure on line 83 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 83 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
PipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey

Check failure on line 84 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 84 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
if platform.PipelineConfig.MutexName != "" {

Check failure on line 86 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 86 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
PipelineOptions.MutexName = platform.PipelineConfig.MutexName

Check failure on line 87 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 87 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
break
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, &PipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -300,9 +313,22 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
}
}

var PipelineOptions *argocompiler.Options
for key, platform := range t.platformSpec.Platforms {
if key == "kubernetes" && platform != nil && platform.PipelineConfig != nil {

Check failure on line 318 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 318 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
if platform.PipelineConfig.SemaphoreKey != "" {

Check failure on line 319 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 319 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
PipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey

Check failure on line 320 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 320 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
if platform.PipelineConfig.MutexName != "" {

Check failure on line 322 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 322 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
PipelineOptions.MutexName = platform.PipelineConfig.MutexName

Check failure on line 323 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 323 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
break
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, PipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down
33 changes: 27 additions & 6 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8sres "k8s.io/apimachinery/pkg/api/resource"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -40,6 +41,8 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
SemaphoreKey string
MutexName string
}

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
Expand Down Expand Up @@ -76,6 +79,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

var semaphoreKey, mutexName string
if opts != nil && opts.SemaphoreKey != "" {
semaphoreKey = opts.SemaphoreKey
}
if opts != nil && opts.MutexName != "" {
mutexName = opts.MutexName
}

var kubernetesSpec *pipelinespec.SinglePlatformSpec
if kubernetesSpecArg != nil {
// clone kubernetesSpecArg, because we don't want to change it
Expand All @@ -94,13 +105,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Note, uncomment the following during development to view argo inputs/outputs in KFP UI.
// TODO(Bobgy): figure out what annotations we should use for v2 engine.
// For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts
// suitable for debugging.
//
// Uncomment during development for better debugging in KFP UI
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
Expand All @@ -117,8 +124,22 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
Synchronization: &wfapi.Synchronization{
Semaphore: &wfapi.SemaphoreRef{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: "semaphore-config",
},
Key: semaphoreKey,
},
},
Mutex: &wfapi.Mutex{
Name: mutexName,
},
},
},
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec
}

// Add kubernetes spec to annotation
if state.kubernetesSpec != nil {
if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil {
kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel]
if ok {
state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)
Expand Down

0 comments on commit 497bc6b

Please sign in to comment.