Skip to content

Commit

Permalink
feature(backend): Allow KFP apiserver to create recursion pipelineLoo…
Browse files Browse the repository at this point in the history
…p CRs for the users (#512)

* add auto apply custom task CRs before pipelineRun creation

* fix lint

* store loop resource in ordered dict

* Update kfp-admin-guide.md

* sorted pipeline params to avoid randomness

* sorted pipeline params to avoid randomness

* regenerate tests

* convert template string from yaml to json

* regenerate tests

* clean up unnecessary helper function

* resolve merge conflicts

* resolve merge conflicts

* remove unnecessary go mod package

* only opt-in to embed the recursion loop annotations

* only opt-in to embed the recursion loop annotations

* enable opt-in only via package variable

* add example for how to inline recursive loops

* resolve conflicts

* regenerate tests

* make auto generate and apply recursive pipelineloop in annotations as default

* move parameter sorting to the end

* add parameter sorting for recursion loops

* sort recursive task parameters

* add more details on how to opt-out auto apply custom resource
  • Loading branch information
Tomcli authored Jul 21, 2021
1 parent f1f1a1c commit d9b8539
Show file tree
Hide file tree
Showing 67 changed files with 2,627 additions and 100 deletions.
5 changes: 5 additions & 0 deletions backend/src/apiserver/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
ArtifactImage string = "ARTIFACT_IMAGE"
ArtifactCopyStepTemplate string = "ARTIFACT_COPY_STEP_TEMPLATE"
InjectDefaultScript string = "INJECT_DEFAULT_SCRIPT"
ApplyTektonCustomResource string = "APPLY_TEKTON_CUSTOM_RESOURCE"
UpdatePipelineVersionByDefault string = "AUTO_UPDATE_PIPELINE_DEFAULT_VERSION"
TokenReviewAudience string = "TOKEN_REVIEW_AUDIENCE"
TerminateStatus string = "TERMINATE_STATUS"
Expand Down Expand Up @@ -128,6 +129,10 @@ func IsInjectDefaultScript() bool {
return GetBoolConfigWithDefault(InjectDefaultScript, true)
}

func IsApplyTektonCustomResource() string {
return GetStringConfigWithDefault(ApplyTektonCustomResource, "true")
}

func GetPodNamespace() string {
return GetStringConfig(PodNamespace)
}
Expand Down
162 changes: 134 additions & 28 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -420,6 +422,31 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {

workflow.Name = workflow.Name + "-" + runId[0:5]

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiRun.GetResourceReferences())
if err != nil {
return nil, err
}
if ref != nil {
apiRun.ResourceReferences = append(apiRun.GetResourceReferences(), ref)
}

namespace, err := r.getNamespaceFromExperiment(apiRun.GetResourceReferences())
if err != nil {
return nil, err
}

// Predefine custom resource if resource_templates are provided and feature flag
// is enabled.
if strings.ToLower(common.IsApplyTektonCustomResource()) == "true" {
if tektonTemplates, ok := workflow.Annotations["tekton.dev/resource_templates"]; ok {
err = r.applyCustomResources(workflow, tektonTemplates, namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Apply Tekton Custom resource Failed")
}
}
}

err = r.tektonPreprocessing(workflow)
if err != nil {
return nil, util.NewInternalServerError(err, "Tekton Preprocessing Failed")
Expand All @@ -438,20 +465,6 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {
// }
// }

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiRun.GetResourceReferences())
if err != nil {
return nil, err
}
if ref != nil {
apiRun.ResourceReferences = append(apiRun.GetResourceReferences(), ref)
}

namespace, err := r.getNamespaceFromExperiment(apiRun.GetResourceReferences())
if err != nil {
return nil, err
}

// Create Tekton pipelineRun CRD resource
newWorkflow, err := r.getWorkflowClient(namespace).Create(context.Background(), workflow.Get(), v1.CreateOptions{})
wfs, _ := json.Marshal(newWorkflow)
Expand Down Expand Up @@ -754,6 +767,31 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
},
}

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiJob.GetResourceReferences())
if err != nil {
return nil, err
}
if ref != nil {
apiJob.ResourceReferences = append(apiJob.GetResourceReferences(), ref)
}

namespace, err := r.getNamespaceFromExperiment(apiJob.GetResourceReferences())
if err != nil {
return nil, err
}

// Predefine custom resource if resource_templates are provided and feature flag
// is enabled.
if strings.ToLower(common.IsApplyTektonCustomResource()) == "true" {
if tektonTemplates, ok := workflow.Annotations["tekton.dev/resource_templates"]; ok {
err = r.applyCustomResources(workflow, tektonTemplates, namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Apply Tekton Custom resource Failed")
}
}
}

err = r.tektonPreprocessing(workflow)
if err != nil {
return nil, util.NewInternalServerError(err, "Tekton Preprocessing Failed")
Expand All @@ -772,20 +810,6 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
// }
// }

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiJob.GetResourceReferences())
if err != nil {
return nil, err
}
if ref != nil {
apiJob.ResourceReferences = append(apiJob.GetResourceReferences(), ref)
}

namespace, err := r.getNamespaceFromExperiment(apiJob.GetResourceReferences())
if err != nil {
return nil, err
}

newScheduledWorkflow, err := r.getScheduledWorkflowClient(namespace).Create(context.Background(), scheduledWorkflow, v1.CreateOptions{})
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a scheduled workflow for (%s)", scheduledWorkflow.Name)
Expand Down Expand Up @@ -1562,3 +1586,85 @@ func (r *ResourceManager) getHostPathVolumeSource(name string, path string) core
},
}
}

func (r *ResourceManager) applyCustomResources(workflow util.Workflow, tektonTemplates string, namespace string) error {
// Create kubeClient to deploy Tekton custom task crd
var config *rest.Config
config, err := rest.InClusterConfig()
if err != nil {
glog.Errorf("error creating client configuration: %v", err)
return err
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Errorf("Failed to create client: %v", err)
return err
}
var templates []interface{}
// Decode metadata into JSON payload.
err = json.Unmarshal([]byte(tektonTemplates), &templates)
if err != nil {
glog.Errorf("Failed to Unmarshal custom task CRD: %v", err)
return err
}
for i := range templates {
template := templates[i]
apiVersion, ok := template.(map[string]interface{})["apiVersion"].(string)
if !ok {
glog.Errorf("Failed to get Tekton custom task apiVersion")
return errors.New("Failed to get Tekton custom task apiVersion")
}
singlarKind, ok := template.(map[string]interface{})["kind"].(string)
if !ok {
glog.Errorf("Failed to get Tekton custom task kind")
return errors.New("Failed to get Tekton custom task kind")
}
api := strings.Split(apiVersion, "/")[0]
version := strings.Split(apiVersion, "/")[1]
resource := strings.ToLower(singlarKind) + "s"
name, ok := template.(map[string]interface{})["metadata"].(map[string]interface{})["name"].(string)
if !ok {
glog.Errorf("Failed to get Tekton custom task name")
return errors.New("Failed to get Tekton custom task name")
}
body, err := json.Marshal(template)
if err != nil {
glog.Errorf("Failed to convert to JSON: %v", err)
return err
}
// Check whether the resource is exist, if yes do a patch
// if not do a post(create)
_, err = kubeClient.RESTClient().
Get().
AbsPath("/apis/" + api + "/" + version).
Namespace(namespace).
Resource(resource).
Name(name).
DoRaw(context.Background())
if err != nil {
_, err = kubeClient.RESTClient().Post().
AbsPath(fmt.Sprintf("/apis/%s/%s", api, version)).
Namespace(namespace).
Resource(resource).
Body(body).
DoRaw(context.Background())
if err != nil {
glog.Errorf("Failed to create resource for pipeline: %s, %v", workflow.Name, err)
return err
}
} else {
_, err = kubeClient.RESTClient().Patch(types.MergePatchType).
AbsPath(fmt.Sprintf("/apis/%s/%s", api, version)).
Namespace(namespace).
Resource(resource).
Name(name).
Body(body).
DoRaw(context.Background())
if err != nil {
glog.Errorf("Failed to patch resource for pipeline: %s, %v", workflow.Name, err)
return err
}
}
}
return nil
}
20 changes: 20 additions & 0 deletions guides/kfp-admin-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This page introduces different ways to configure the kfp-tekton admin settings s
- [Enable Auto Strip for End of File newlines](#enable-auto-strip-for-end-of-file-newlines)
- [Customize Artifact Image to do your own post processing](#customize-artifact-image-to-do-your-own-post-processing)
- [Customize S3 Endpoint for KFP Tekton artifacts](#customize-s3-endpoint-for-kfp-tekton-artifacts)
- [Disable Auto Apply for Tekton Custom Resources](#disable-auto-apply-for-tekton-custom-resources)
- [Disable Caching](#disable-caching)
- [Change the Tekton Terminate API Method](#change-the-tekton-terminate-api-method)

Expand Down Expand Up @@ -85,6 +86,25 @@ kubectl rollout restart deploy/ml-pipeline -n kubeflow
kubectl rollout restart deploy/minio -n kubeflow
```

## Disable Auto Apply for Tekton Custom Resources

Due to the lack of inline task template support in the current Tekton API, we allow the SDK to embed any custom resource to the pipelineRun annotations and apply it for every pipeline run. However, all the custom resources generated by the SDK are only unique for each pipeline version but not for each pipeline run. This is due to all Tekton resources are namespace-scope, and all pipeline runs share the same namespace.

Although this feature allows users to deploy all the necessary custom resources with one KFP API call, KFP doesn't clean up these custom resources because the same custom resource could have multiple owners. This feature mainly is for recursion loops where the sub-pipeline needs to be self-referencible.

If the admin doesn't want to let kfp-tekton handles the creation of Tekton custom resources, this feature can be disabled with:
```shell
kubectl patch cm kfp-tekton-config -n kubeflow -p '{"data":{"apply_tekton_custom_resource":"false"}}'
kubectl rollout restart deploy/ml-pipeline -n kubeflow
```

To disable it from the SDK as a user, add the following Tekton config during compile time.
```python
pipeline_conf = TektonPipelineConf()
pipeline_conf.set_resource_in_separate_yaml(True)
self._test_pipeline_workflow(test_pipeline, 'test.yaml', tekton_pipeline_conf=pipeline_conf)
```

## Disable Caching

KFP Caching will cache all the workloads that use the same task with the same inputs. It's enabled by default. To disable caching on the server side, run the commands below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ rules:
- pipelineruns
- taskruns
- conditions
- runs
verbs:
- create
- get
- list
- watch
- update
- patch
- delete
- apiGroups:
- custom.tekton.dev
resources:
- pipelineloops
verbs:
- create
- get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ rules:
- pipelineruns
- taskruns
- conditions
- runs
verbs:
- create
- get
- list
- watch
- update
- patch
- delete
- apiGroups:
- custom.tekton.dev
resources:
- pipelineloops
verbs:
- create
- get
Expand Down
5 changes: 5 additions & 0 deletions manifests/kustomize/base/pipeline/apiserver-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ spec:
configMapKeyRef:
name: kfp-tekton-config
key: inject_default_script
- name: APPLY_TEKTON_CUSTOM_RESOURCE
valueFrom:
configMapKeyRef:
name: kfp-tekton-config
key: apply_tekton_custom_resource
- name: TERMINATE_STATUS
valueFrom:
configMapKeyRef:
Expand Down
1 change: 1 addition & 0 deletions manifests/kustomize/base/pipeline/kfp-pipeline-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ data:
track_artifacts: "true"
strip_eof: "false"
inject_default_script: "true"
apply_tekton_custom_resource: "true"
terminate_status: "Cancelled"
artifact_script: |-
#!/usr/bin/env sh
Expand Down
13 changes: 13 additions & 0 deletions manifests/kustomize/base/pipeline/ml-pipeline-apiserver-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rules:
- pipelineruns
- taskruns
- conditions
- runs
verbs:
- create
- get
Expand Down Expand Up @@ -63,3 +64,15 @@ rules:
- tokenreviews
verbs:
- create
- apiGroups:
- custom.tekton.dev
resources:
- pipelineloops
verbs:
- create
- get
- list
- watch
- update
- patch
- delete
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ rules:
- pipelineruns
- taskruns
- conditions
- runs
verbs:
- create
- get
- list
- watch
- update
- patch
- delete
- apiGroups:
- custom.tekton.dev
resources:
- pipelineloops
verbs:
- create
- get
Expand Down
8 changes: 7 additions & 1 deletion sdk/FEATURES.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ The `finally` syntax is supported since Tekton version `0.14.0`.

PipelineLoops is a feature for running a component or a set of component tasks multiple times in a loop. Right now, Tekton supports loop pipeline/tasks via an implementation of [Tekton Custom Tasks Controller](https://github.com/tektoncd/community/blob/master/teps/0002-custom-tasks.md) named as "PipelineLoop". Please refer to the examples [here](/tekton-catalog/pipeline-loops/examples) to understand more details about the usage of loops.

By default, the SDK will not compile all the recursion loop resources in the pipelineRun annotations. If you want to apply the recursion loop resources together with pipelinerun as an admin, add the following code snippet before compiling the pipeline.
```python
import kfp_tekton
kfp_tekton.compiler.LOOP_RESOURCES_IN_SEPARATE_YAML=False
```

To use this feature, please ensure Tekton version >= v0.19, and "data.enable-custom-tasks" is "true" in feature-flags configmap:
`kubectl edit cm feature-flags -n tekton-pipelines`

Expand All @@ -142,7 +148,7 @@ To see how the Python SDK provides this feature, refer to the examples below:

### Any Sequencer

When any one of the task dependencies completes successfully and the conditions meet, the dependent task will be started. Order of execution of the dependencies doesn’t matter, and the pipeline doesn't wait for all the task dependencies to complete before moving to the next step. Condition can be applied to enforce the task dependencies completes as expected. The condition expression should be the same format as is in Kubeflow ConditionOperator, and the result of containerOps can be used in expression. Notice the expression should only contain results from only one task because the purpose here is to check the simple condition for the task's output when a task complete. And also the operand in the expression should be int or string, other python types will be transferred to string automatically.
When any one of the task dependencies completes successfully and the conditions meet, the dependent task will be started. Order of execution of the dependencies doesn’t matter, and the pipeline doesn't wait for all the task dependencies to complete before moving to the next step. Condition can be applied to enforce the task dependencies completes as expected. The condition expression should be the same format as is in Kubeflow ConditionOperator, and the result of containerOps can be used in expression. Notice the expression should only contain results from only one task because the purpose here is to check the simple condition for the task's output when a task complete. And also the operand in the expression should be int or string, other python types will be transferred to string automatically.

The Any Sequencer exits if all dependencies failed or skipped, or all conditions unmatched.

Expand Down
Loading

0 comments on commit d9b8539

Please sign in to comment.