Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to submit CronWorkflow from CLI #2003

Merged
merged 19 commits into from
Feb 7, 2020
Merged
78 changes: 59 additions & 19 deletions cmd/argo/commands/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package commands
import (
"log"
"os"
"strings"

workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
simster7 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/pkg/errors"
argoJson "github.com/argoproj/pkg/json"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/cmd/argo/commands/cron"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
)
Expand All @@ -30,20 +34,25 @@ func NewSubmitCommand() *cobra.Command {
submitOpts util.SubmitOpts
cliSubmitOpts cliSubmitOpts
priority int32
from string
)
var command = &cobra.Command{
alexec marked this conversation as resolved.
Show resolved Hide resolved
Use: "submit FILE1 FILE2...",
Short: "submit a workflow",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
if cmd.Flag("priority").Changed {
cliSubmitOpts.priority = &priority
}

SubmitWorkflows(args, &submitOpts, &cliSubmitOpts)
if from != "" {
SubmitWorkflowFromResource(from, &submitOpts, &cliSubmitOpts)
} else {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
SubmitWorkflowsFromFile(args, &submitOpts, &cliSubmitOpts)
}
},
}
command.Flags().StringVar(&submitOpts.Name, "name", "", "override metadata.name")
Expand All @@ -61,6 +70,7 @@ func NewSubmitCommand() *cobra.Command {
command.Flags().Int32Var(&priority, "priority", 0, "workflow priority")
command.Flags().StringVarP(&submitOpts.ParameterFile, "parameter-file", "f", "", "pass a file containing all input parameters")
command.Flags().StringVarP(&submitOpts.Labels, "labels", "l", "", "Comma separated labels to apply to the workflow. Will override previous values.")
command.Flags().StringVar(&from, "from", "", "Submit from a WorkflowTemplate or CronWorkflow. E.g., --from=CronWorkflow/hello-world-cwf")
alexec marked this conversation as resolved.
Show resolved Hide resolved
// Only complete files with appropriate extension.
err := command.Flags().SetAnnotation("parameter-file", cobra.BashCompFilenameExt, []string{"json", "yaml", "yml"})
if err != nil {
Expand All @@ -69,18 +79,7 @@ func NewSubmitCommand() *cobra.Command {
return command
}

func SubmitWorkflows(filePaths []string, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {
if submitOpts == nil {
submitOpts = &util.SubmitOpts{}
}
if cliOpts == nil {
cliOpts = &cliSubmitOpts{}
}

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

func SubmitWorkflowsFromFile(filePaths []string, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {
alexec marked this conversation as resolved.
Show resolved Hide resolved
fileContents, err := util.ReadManifest(filePaths...)
if err != nil {
log.Fatal(err)
Expand All @@ -92,6 +91,47 @@ func SubmitWorkflows(filePaths []string, submitOpts *util.SubmitOpts, cliOpts *c
workflows = append(workflows, wfs...)
}

submitWorkflows(workflows, submitOpts, cliOpts)
}

func SubmitWorkflowFromResource(resourceIdentifier string, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {
alexec marked this conversation as resolved.
Show resolved Hide resolved

resIdSplit := strings.Split(resourceIdentifier, "/")
if len(resIdSplit) != 2 {
log.Fatalf("resource identifier '%s' is malformed. Expected is KIND/NAME, e.g. CronWorkflow/hello-world-cwf", resourceIdentifier)
}

var workflowToSubmit *wfv1.Workflow
switch resIdSplit[0] {
case workflow.CronWorkflowKind:
cwfIf := cron.InitCronWorkflowClient()
alexec marked this conversation as resolved.
Show resolved Hide resolved
alexec marked this conversation as resolved.
Show resolved Hide resolved
cwf, err := cwfIf.Get(resIdSplit[1], v1.GetOptions{})
if err != nil {
log.Fatalf("Unable to get CronWorkflow '%s': %s", resIdSplit[1], err)
}
workflowToSubmit, err = common.ConvertCronWorkflowToWorkflow(cwf)
if err != nil {
log.Fatalf("Unable to create Workflow from CronWorkflow '%s': %s", resIdSplit[1], err)
}
default:
log.Fatalf("Resource Kind '%s' is not supported with --from", resIdSplit[0])
}

submitWorkflows([]wfv1.Workflow{*workflowToSubmit}, submitOpts, cliOpts)
}

func submitWorkflows(workflows []wfv1.Workflow, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {
if submitOpts == nil {
alexec marked this conversation as resolved.
Show resolved Hide resolved
submitOpts = &util.SubmitOpts{}
}
if cliOpts == nil {
alexec marked this conversation as resolved.
Show resolved Hide resolved
cliOpts = &cliSubmitOpts{}
}

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

if cliOpts.watch {
if len(workflows) > 1 {
log.Fatalf("Cannot watch more than one workflow")
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func (s *ArgoServerSuite) TestArtifactServer() {
SubmitWorkflow().
WaitForWorkflow(15 * time.Second).
Then().
Expect(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
})

Expand Down Expand Up @@ -800,7 +800,7 @@ func (s *ArgoServerSuite) TestArchivedWorkflowService() {
SubmitWorkflow().
WaitForWorkflow(20 * time.Second).
Then().
Expect(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
})
s.Run("List", func(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package e2e

import (
"os"
"regexp"
"testing"
"time"

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
simster7 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/test/e2e/fixtures"
)
Expand Down Expand Up @@ -106,6 +109,26 @@ func (s *CLISuite) TestRoot() {
}
})
})

var createdWorkflowName string
s.Given(s.T()).CronWorkflow("@testdata/basic.yaml").
When().
CreateCronWorkflow().
RunCli([]string{"submit", "--from", "CronWorkflow/test-cron-wf-basic"}, func(t *testing.T, output string, err error) {
assert.NoError(t, err)
alexec marked this conversation as resolved.
Show resolved Hide resolved
assert.Contains(t, output, "Name: test-cron-wf-basic-")
r := regexp.MustCompile(`Name:\s+?(test-cron-wf-basic-[a-z0-9]+)`)
res := r.FindStringSubmatch(output)
if len(res) != 2 {
assert.Fail(t, "Internal test error, please report a bug")
}
createdWorkflowName = res[1]
}).
WaitForWorkflowName(createdWorkflowName, 15*time.Second).
Then().
ExpectWorkflowName(createdWorkflowName, func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
})
}

func (s *CLISuite) TestWorkflowDelete() {
Expand Down Expand Up @@ -218,6 +241,7 @@ func (s *CLISuite) TestTemplate() {
})
}


func (s *CLISuite) TestCron() {
s.Run("Create", func(t *testing.T) {
s.Given(t).RunCli([]string{"cron", "create", "testdata/basic.yaml"}, func(t *testing.T, output string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/cli_with_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *CLIWithServerSuite) TestArchive() {
SubmitWorkflow().
WaitForWorkflow(30 * time.Second).
Then().
Expect(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
})
s.Run("List", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (g *Given) CronWorkflow(text string) *Given {
return g
}

func (g *Given) RunCli(args []string, block func(*testing.T, string, error)) *Given {
func (g *Given) RunCli(args []string, block func(t *testing.T, output string, err error)) *Given {
output, err := runCli(g.diagnostics, args)
block(g.t, output, err)
return g
Expand Down
17 changes: 13 additions & 4 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@ type Then struct {
kubeClient kubernetes.Interface
}

func (t *Then) Expect(block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
if t.workflowName == "" {
func (t *Then) ExpectWorkflow(block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
return t.expectWorkflow(t.workflowName, block)
}

func (t *Then) ExpectWorkflowName(workflowName string, block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
return t.expectWorkflow(workflowName, block)
}

func (t *Then) expectWorkflow(workflowName string, block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
if workflowName == "" {
t.t.Fatal("No workflow to test")
}
log.WithFields(log.Fields{"workflow": t.workflowName}).Info("Checking expectation")
wf, err := t.client.Get(t.workflowName, metav1.GetOptions{})
log.WithFields(log.Fields{"test": t.t.Name(), "workflow": workflowName}).Info("Checking expectation")
wf, err := t.client.Get(workflowName, metav1.GetOptions{})
if err != nil {
t.t.Fatal(err)
}
Expand All @@ -43,6 +51,7 @@ func (t *Then) Expect(block func(t *testing.T, metadata *metav1.ObjectMeta, stat
}
block(t.t, &wf.ObjectMeta, &wf.Status)
return t

}

func (t *Then) ExpectCron(block func(t *testing.T, cronWf *wfv1.CronWorkflow)) *Then {
Expand Down
22 changes: 17 additions & 5 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func (w *When) CreateCronWorkflow() *When {
return w
}

func (w *When) WaitForWorkflowCondition(test func(wf *wfv1.Workflow) bool, condition string, timeout time.Duration) *When {
logCtx := log.WithFields(log.Fields{"workflow": w.workflowName, "condition": condition, "timeout": timeout})
func (w *When) waitForWorkflow(workflowName string, test func(wf *wfv1.Workflow) bool, condition string, timeout time.Duration) *When {
logCtx := log.WithFields(log.Fields{"workflow": workflowName, "condition": condition, "timeout": timeout})
logCtx.Info("Waiting for condition")
opts := metav1.ListOptions{FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", w.workflowName)).String()}
opts := metav1.ListOptions{FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", workflowName)).String()}
watch, err := w.client.Watch(opts)
if err != nil {
w.t.Fatal(err)
Expand Down Expand Up @@ -129,13 +129,19 @@ func (w *When) hydrateWorkflow(wf *wfv1.Workflow) {
}
}
func (w *When) WaitForWorkflowToStart(timeout time.Duration) *When {
return w.WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return w.waitForWorkflow(w.workflowName, func(wf *wfv1.Workflow) bool {
return !wf.Status.StartedAt.IsZero()
}, "to start", timeout)
}

func (w *When) WaitForWorkflow(timeout time.Duration) *When {
return w.WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return w.waitForWorkflow(w.workflowName, func(wf *wfv1.Workflow) bool {
return !wf.Status.FinishedAt.IsZero()
}, "to finish", timeout)
}

func (w *When) WaitForWorkflowName(workflowName string, timeout time.Duration) *When {
return w.waitForWorkflow(workflowName, func(wf *wfv1.Workflow) bool {
return !wf.Status.FinishedAt.IsZero()
}, "to finish", timeout)
}
Expand All @@ -157,6 +163,12 @@ func (w *When) DeleteWorkflow() *When {
return w
}

func (w *When) RunCli(args []string, block func(t *testing.T, output string, err error)) *When {
output, err := runCli(w.diagnostics, args)
block(w.t, output, err)
return w
}

func (w *When) Then() *Then {
return &Then{
t: w.t,
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ spec:
SubmitWorkflow().
WaitForWorkflow(30 * time.Second).
Then().
Expect(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Len(t, status.Nodes, 7)
nodeStatus := status.Nodes.FindByDisplayName("B")
Expand All @@ -88,7 +88,7 @@ func (s *FunctionalSuite) TestFastFailOnPodTermination() {
SubmitWorkflow().
WaitForWorkflow(120 * time.Second).
Then().
Expect(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeFailed, status.Phase)
assert.Len(t, status.Nodes, 4)
nodeStatus := status.Nodes.FindByDisplayName("sleep")
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s *SmokeSuite) TestBasicWorkflow() {
SubmitWorkflow().
WaitForWorkflow(15 * time.Second).
Then().
Expect(func(t *testing.T, _ *metav1.ObjectMeta, wf *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, wf *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Phase)
assert.NotEmpty(t, wf.Nodes)
})
Expand All @@ -36,7 +36,7 @@ func (s *SmokeSuite) TestArtifactPassing() {
SubmitWorkflow().
WaitForWorkflow(30 * time.Second).
Then().
Expect(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}
Expand All @@ -50,7 +50,7 @@ func (s *SmokeSuite) TestWorkflowTemplateBasic() {
SubmitWorkflow().
WaitForWorkflow(60 * time.Second).
Then().
Expect(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/workflow_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ spec:
SubmitWorkflow().
WaitForWorkflow(30 * time.Second).
Then().
Expect(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.NodeSucceeded)
})

Expand Down
2 changes: 1 addition & 1 deletion workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func GetTemplateHolderString(tmplHolder wfv1.TemplateHolder) string {
}
}

func ConvertToWorkflow(cronWf *wfv1.CronWorkflow) (*wfv1.Workflow, error) {
func ConvertCronWorkflowToWorkflow(cronWf *wfv1.CronWorkflow) (*wfv1.Workflow, error) {
newTypeMeta := metav1.TypeMeta{
Kind: workflow.WorkflowKind,
APIVersion: cronWf.TypeMeta.APIVersion,
Expand Down
2 changes: 1 addition & 1 deletion workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (woc *cronWfOperationCtx) Run() {
return
}

wf, err := common.ConvertToWorkflow(woc.cronWf)
wf, err := common.ConvertCronWorkflowToWorkflow(woc.cronWf)
if err != nil {
log.Errorf("Unable to create Workflow for CronWorkflow %s", woc.name)
return
Expand Down
2 changes: 1 addition & 1 deletion workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func ValidateCronWorkflow(wftmplGetter templateresolution.WorkflowTemplateNamesp
return errors.Errorf(errors.CodeBadRequest, "startingDeadlineSeconds must be positive")
}

wf, err := common.ConvertToWorkflow(cronWf)
wf, err := common.ConvertCronWorkflowToWorkflow(cronWf)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "cannot convert to Workflow: %s", err)
}
Expand Down