Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Validate CronWorkflow before creation #2532

Merged
merged 3 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions server/cronworkflow/cron_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,23 @@ func (c *cronWorkflowServiceServer) ListCronWorkflows(ctx context.Context, req *
if req.ListOptions != nil {
options = *req.ListOptions
}
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).List(c.withInstanceID(options))
optsWithInstanceId := c.withInstanceID(options)
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).List(optsWithInstanceId)
}

func (c *cronWorkflowServiceServer) CreateCronWorkflow(ctx context.Context, req *cronworkflowpkg.CreateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
c.setInstanceID(req)
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Create(req.CronWorkflow)
}
wfClient := auth.GetWfClient(ctx)
if req.CronWorkflow == nil {
return nil, fmt.Errorf("cron workflow was not found in the request body")
}

wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))

err := validate.ValidateCronWorkflow(wftmplGetter, req.CronWorkflow)
if err != nil {
return nil, err
}

func (c *cronWorkflowServiceServer) setInstanceID(req *cronworkflowpkg.CreateCronWorkflowRequest) {
if len(c.instanceID) > 0 {
labels := req.CronWorkflow.GetLabels()
if labels == nil {
Expand All @@ -55,26 +63,28 @@ func (c *cronWorkflowServiceServer) setInstanceID(req *cronworkflowpkg.CreateCro
labels[common.LabelKeyControllerInstanceID] = c.instanceID
req.CronWorkflow.SetLabels(labels)
}

return wfClient.ArgoprojV1alpha1().CronWorkflows(req.Namespace).Create(req.CronWorkflow)
}

func (c *cronWorkflowServiceServer) GetCronWorkflow(ctx context.Context, req *cronworkflowpkg.GetCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
options := metav1.GetOptions{}
if req.GetOptions != nil {
options = *req.GetOptions
}
return c.getAndValidateCronWorkflow(ctx, req.Namespace, req.Name, options)
return c.getCronWorkflow(ctx, req.Namespace, req.Name, options)
}

func (c *cronWorkflowServiceServer) UpdateCronWorkflow(ctx context.Context, req *cronworkflowpkg.UpdateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
_, err := c.getAndValidateCronWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
_, err := c.getCronWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(req.CronWorkflow)
}

func (c *cronWorkflowServiceServer) DeleteCronWorkflow(ctx context.Context, req *cronworkflowpkg.DeleteCronWorkflowRequest) (*cronworkflowpkg.CronWorkflowDeletedResponse, error) {
_, err := c.getAndValidateCronWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
_, err := c.getCronWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -97,33 +107,33 @@ func (c *cronWorkflowServiceServer) withInstanceID(opt metav1.ListOptions) metav
return opt
}

func (c *cronWorkflowServiceServer) getAndValidateCronWorkflow(ctx context.Context, namespace string, name string, options metav1.GetOptions) (*v1alpha1.CronWorkflow, error) {
func (c *cronWorkflowServiceServer) getCronWorkflow(ctx context.Context, namespace string, name string, options metav1.GetOptions) (*v1alpha1.CronWorkflow, error) {
wfClient := auth.GetWfClient(ctx)
cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, options)
if err != nil {
return nil, err
}
err = c.validateInstanceID(cronWf)
if err != nil {
return nil, err
ok := c.validateInstanceID(cronWf)
if !ok {
return nil, fmt.Errorf("CronWorkflow '%s' is not managed by the current Argo server", cronWf.Name)
}
return cronWf, nil
}

func (c *cronWorkflowServiceServer) validateInstanceID(cronWf *v1alpha1.CronWorkflow) error {
func (c *cronWorkflowServiceServer) validateInstanceID(cronWf *v1alpha1.CronWorkflow) bool {
if len(c.instanceID) == 0 {
if len(cronWf.Labels) == 0 {
return nil
return true
}
if _, ok := cronWf.Labels[common.LabelKeyControllerInstanceID]; !ok {
return nil
return true
}
} else if len(cronWf.Labels) > 0 {
if val, ok := cronWf.Labels[common.LabelKeyControllerInstanceID]; ok {
if val == c.instanceID {
return nil
return true
}
}
}
return fmt.Errorf("the CronWorkflow is not managed by current Argo server")
return false
}
40 changes: 32 additions & 8 deletions server/cronworkflow/cron_workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,43 @@ import (
"context"
"testing"

cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
"github.com/argoproj/argo/server/auth"

"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wftFake "github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo/server/auth"
)

func Test_cronWorkflowServiceServer(t *testing.T) {
cronWf := &wfv1.CronWorkflow{
ObjectMeta: v1.ObjectMeta{Namespace: "my-ns", Name: "my-name"},
cronWfRaw := `apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: my-name
namespace: my-ns
spec:
schedule: "* * * * *"
concurrencyPolicy: "Allow"
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowSpec:
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
templates:
- name: whalesay
container:
image: python:alpine3.6
imagePullPolicy: IfNotPresent
command: ["sh", -c]
args: ["echo hello"]`

var cronWf wfv1.CronWorkflow
err := yaml.Unmarshal([]byte(cronWfRaw), &cronWf)
if err != nil {
panic(err)
}
wfClientset := wftFake.NewSimpleClientset()
server := NewCronWorkflowServer("testinstanceid001")
Expand All @@ -25,7 +49,7 @@ func Test_cronWorkflowServiceServer(t *testing.T) {
t.Run("CreateCronWorkflow", func(t *testing.T) {
created, err := server.CreateCronWorkflow(ctx, &cronworkflowpkg.CreateCronWorkflowRequest{
Namespace: "my-ns",
CronWorkflow: cronWf,
CronWorkflow: &cronWf,
})
if assert.NoError(t, err) {
assert.NotNil(t, created)
Expand All @@ -44,7 +68,7 @@ func Test_cronWorkflowServiceServer(t *testing.T) {
}
})
t.Run("UpdateCronWorkflow", func(t *testing.T) {
cronWf, err := server.UpdateCronWorkflow(ctx, &cronworkflowpkg.UpdateCronWorkflowRequest{Namespace: "my-ns", Name: "my-name", CronWorkflow: cronWf})
cronWf, err := server.UpdateCronWorkflow(ctx, &cronworkflowpkg.UpdateCronWorkflowRequest{Namespace: "my-ns", Name: "my-name", CronWorkflow: &cronWf})
if assert.NoError(t, err) {
assert.NotNil(t, cronWf)
}
Expand Down
58 changes: 27 additions & 31 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ func NewWorkflowServer(instanceID string, offloadNodeStatusRepo sqldb.OffloadNod
}
}

func (s *workflowServer) setInstanceID(req *workflowpkg.WorkflowCreateRequest) {
func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

if req.Workflow == nil {
return nil, fmt.Errorf("workflow body not specified")
}

if req.Workflow.Namespace == "" {
req.Workflow.Namespace = req.Namespace
}

if len(req.InstanceID) > 0 || len(s.instanceID) > 0 {
labels := req.Workflow.GetLabels()
if labels == nil {
Expand All @@ -46,20 +56,6 @@ func (s *workflowServer) setInstanceID(req *workflowpkg.WorkflowCreateRequest) {
}
req.Workflow.SetLabels(labels)
}
}

func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

if req.Workflow == nil {
return nil, fmt.Errorf("workflow body not specified")
}

if req.Workflow.Namespace == "" {
req.Workflow.Namespace = req.Namespace
}

s.setInstanceID(req)

wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))

Expand Down Expand Up @@ -91,7 +87,7 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
if req.GetOptions != nil {
wfGetOption = *req.GetOptions
}
wf, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, wfGetOption)
wf, err := s.getWorkflow(ctx, req.Namespace, req.Name, wfGetOption)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +197,7 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
}

func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.WorkflowDeleteRequest) (*workflowpkg.WorkflowDeleteResponse, error) {
_, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
_, err := s.getWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -217,7 +213,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)

wf, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -231,7 +227,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor

func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowResubmitRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wf, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -251,7 +247,7 @@ func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.
func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

_, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
_, err := s.getWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -273,7 +269,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

_, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
_, err := s.getWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -294,7 +290,7 @@ func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.W
func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

_, err := s.getAndValidateWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
_, err := s.getWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -362,33 +358,33 @@ func (s *workflowServer) withInstanceID(opt metav1.ListOptions) metav1.ListOptio
return opt
}

func (s *workflowServer) getAndValidateWorkflow(ctx context.Context, namespace string, name string, options metav1.GetOptions) (*v1alpha1.Workflow, error) {
func (s *workflowServer) getWorkflow(ctx context.Context, namespace string, name string, options metav1.GetOptions) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wf, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).Get(name, options)
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
ok := s.validateInstanceID(wf)
if !ok {
return nil, fmt.Errorf("Workflow '%s' is not managed by the current Argo server", wf.Name)
}
return wf, nil
}

func (s *workflowServer) validateInstanceID(wf *v1alpha1.Workflow) error {
func (s *workflowServer) validateInstanceID(wf *v1alpha1.Workflow) bool {
if len(s.instanceID) == 0 {
if len(wf.Labels) == 0 {
return nil
return true
}
if _, ok := wf.Labels[common.LabelKeyControllerInstanceID]; !ok {
return nil
return true
}
} else if len(wf.Labels) > 0 {
if val, ok := wf.Labels[common.LabelKeyControllerInstanceID]; ok {
if val == s.instanceID {
return nil
return true
}
}
}
return fmt.Errorf("the workflow is not managed by current Argo server")
return false
}