Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow for setting default configurations for workflows, Fixes #1923, #2044 #2331

Merged
merged 26 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions workflow/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type WorkflowControllerConfig struct {

// Config customized Docker Sock path
DockerSockPath string `json:"dockerSockPath,omitempty"`

DefautWorkflowSpec *wfv1.WorkflowSpec `json:"workflowDefaults,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a docstring above this, like the rest of the configs

}

// KubeConfig is used for wait & init sidecar containers to communicate with a k8s apiserver by a outofcluster method,
Expand Down
41 changes: 39 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -45,8 +47,10 @@ type WorkflowController struct {
// namespace of the workflow controller
namespace string
managedNamespace string

// configMap is the name of the config map in which to derive configuration of the controller from
configMap string

// Config is the workflow controller's configuration
Config config.WorkflowControllerConfig

Expand Down Expand Up @@ -355,16 +359,22 @@ func (wfc *WorkflowController) processNextItem() bool {
wfc.throttler.Remove(key)
return true
}
err = wfc.addingWorkflowDefaultValueIfValueNotExist(wf)
if err != nil {
log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err)
woc := newWorkflowOperationCtx(wf, wfc)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
}

if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" {
wfc.throttler.Remove(key)
// can get here if we already added the completed=true label,
// but we are still draining the controller's workflow workqueue
return true
}

woc := newWorkflowOperationCtx(wf, wfc)

// Loading running workflow from persistence storage if nodeStatusOffload enabled
if wf.Status.IsOffloadNodeStatus() {
nodes, err := wfc.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
Expand Down Expand Up @@ -417,6 +427,33 @@ func (wfc *WorkflowController) processNextItem() bool {
return true
}

// addingWorkflowDefaultValueIfValueNotExist sets values in the workflow.Spec with defaults from the
// workflowController. Values in the workflow will be given the upper hand over the defaults.
// The defaults for the workflow controller is set in the WorkflowController.Config.DefautWorkflowSpec
func (wfc *WorkflowController) addingWorkflowDefaultValueIfValueNotExist(wf *wfv1.Workflow) error {
//var workflowSpec *wfv1.WorkflowSpec = &wf.Spec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this unused line?

if wfc.Config.DefautWorkflowSpec != nil {
defaultsSpec, err := json.Marshal(*wfc.Config.DefautWorkflowSpec)
if err != nil {
return err
}
workflowSpec, err := json.Marshal(wf.Spec)
if err != nil {
return err
}
// https://github.com/kubernetes/apimachinery/blob/2373d029717c4d169463414a6127cd1d0d12680e/pkg/util/strategicpatch/patch.go#L94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new, err := strategicpatch.StrategicMergePatch(defaultsSpec, workflowSpec, wfv1.WorkflowSpec{})
if err != nil {
return err
}
err = json.Unmarshal(new, &wf.Spec)
if err != nil {
return err
}
}
return nil
}

func (wfc *WorkflowController) podWorker() {
for wfc.processNextPodItem() {
}
Expand Down
160 changes: 160 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,55 @@ spec:
args: ["hello world"]
`

var testDefaultWf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
serviceAccountName: whalesay
templates:
- name: whalesay
metadata:
annotations:
annotationKey1: "annotationValue1"
annotationKey2: "annotationValue2"
labels:
labelKey1: "labelValue1"
labelKey2: "labelValue2"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

var testDefaultWfTTL = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
serviceAccountName: whalesay
ttlSecondsAfterFinished: 7
ttlStrategy:
secondsAfterCompletion: 5
templates:
- name: whalesay
metadata:
annotations:
annotationKey1: "annotationValue1"
annotationKey2: "annotationValue2"
labels:
labelKey1: "labelValue1"
labelKey2: "labelValue2"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

func newController() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
Expand All @@ -65,6 +114,68 @@ func newController() *WorkflowController {
}
}

func newControllerWithDefaults() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
wftmplInformer := informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
ctx := context.Background()
go wftmplInformer.Informer().Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), wftmplInformer.Informer().HasSynced) {
panic("Timed out waiting for caches to sync")
}
myBool := true
return &WorkflowController{
Config: config.WorkflowControllerConfig{
ExecutorImage: "executor:latest",
DefautWorkflowSpec: &wfv1.WorkflowSpec{
HostNetwork: &myBool,
},
},
kubeclientset: fake.NewSimpleClientset(),
wfclientset: wfclientset,
completedPods: make(chan string, 512),
wftmplInformer: wftmplInformer,
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
wfArchive: sqldb.NullWorkflowArchive,
}
}

func newControllerWithComplexDefaults() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
wftmplInformer := informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
ctx := context.Background()
go wftmplInformer.Informer().Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), wftmplInformer.Informer().HasSynced) {
panic("Timed out waiting for caches to sync")
}
myBool := true
var ten int32 = 10
var seven int32 = 10
return &WorkflowController{
Config: config.WorkflowControllerConfig{
ExecutorImage: "executor:latest",
DefautWorkflowSpec: &wfv1.WorkflowSpec{
HostNetwork: &myBool,
Entrypoint: "good_entrypoint",
ServiceAccountName: "my_service_account",
TTLStrategy: &wfv1.TTLStrategy{
SecondsAfterCompletion: &ten,
SecondsAfterSuccess: &ten,
SecondsAfterFailure: &ten,
},
TTLSecondsAfterFinished: &seven,
},
},
kubeclientset: fake.NewSimpleClientset(),
wfclientset: wfclientset,
completedPods: make(chan string, 512),
wftmplInformer: wftmplInformer,
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
wfArchive: sqldb.NullWorkflowArchive,
}
}

func unmarshalWF(yamlStr string) *wfv1.Workflow {
var wf wfv1.Workflow
err := yaml.Unmarshal([]byte(yamlStr), &wf)
Expand Down Expand Up @@ -93,3 +204,52 @@ func makePodsRunning(t *testing.T, kubeclientset kubernetes.Interface, namespace
_, _ = podcs.Update(&pod)
}
}

func TestAddingWorkflowDefaultValueIfValueNotExist(t *testing.T) {
NikeNano marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, "hello", "hello")
ans := true
controller := newController()
workflow := unmarshalWF(helloWorldWf)
err := controller.addingWorkflowDefaultValueIfValueNotExist(workflow)
assert.NoError(t, err)
assert.Equal(t, workflow, unmarshalWF(helloWorldWf))
controllerDefaults := newControllerWithDefaults()
defautWorkflowSpec := unmarshalWF(helloWorldWf)
err = controllerDefaults.addingWorkflowDefaultValueIfValueNotExist(defautWorkflowSpec)
assert.NoError(t, err)
assert.Equal(t, defautWorkflowSpec.Spec.HostNetwork, &ans)
assert.NotEqual(t, defautWorkflowSpec, unmarshalWF(helloWorldWf))
assert.Equal(t, *defautWorkflowSpec.Spec.HostNetwork, true)
}

func TestAddingWorkflowDefaultComplex(t *testing.T) {
assert.Equal(t, "hello", "hello")
controller := newControllerWithComplexDefaults()
workflow := unmarshalWF(testDefaultWf)
var ten int32 = 10
assert.Equal(t, workflow.Spec.Entrypoint, "whalesay")
assert.Nil(t, workflow.Spec.TTLStrategy)
err := controller.addingWorkflowDefaultValueIfValueNotExist(workflow)
assert.NoError(t, err)
assert.NotEqual(t, workflow, unmarshalWF(testDefaultWf))
assert.Equal(t, workflow.Spec.Entrypoint, "whalesay")
assert.Equal(t, workflow.Spec.ServiceAccountName, "whalesay")
assert.Equal(t, *workflow.Spec.TTLStrategy.SecondsAfterFailure, ten)
}

func TestAddingWorkflowDefaultComplexTwo(t *testing.T) {
assert.Equal(t, "hello", "hello")
controller := newControllerWithComplexDefaults()
workflow := unmarshalWF(testDefaultWfTTL)
var ten int32 = 10
var seven int32 = 7
var five int32 = 5
err := controller.addingWorkflowDefaultValueIfValueNotExist(workflow)
assert.NoError(t, err)
assert.NotEqual(t, workflow, unmarshalWF(testDefaultWfTTL))
assert.Equal(t, workflow.Spec.Entrypoint, "whalesay")
assert.Equal(t, workflow.Spec.ServiceAccountName, "whalesay")
assert.Equal(t, *workflow.Spec.TTLStrategy.SecondsAfterCompletion, five)
assert.Equal(t, *workflow.Spec.TTLStrategy.SecondsAfterFailure, ten)
assert.Equal(t, *workflow.Spec.TTLSecondsAfterFinished, seven)
}