Skip to content

Commit

Permalink
Merge branch 'master' into release-2.5
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Feb 7, 2020
2 parents 073216c + d6f5953 commit f8b8efc
Show file tree
Hide file tree
Showing 21 changed files with 427 additions and 283 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Currently **officially** using Argo:
1. [Cyrus Biotechnology](https://cyrusbio.com/)
1. [Datadog](https://www.datadoghq.com/)
1. [DataStax](https://www.datastax.com/)
1. [EBSCO Information Services](https://www.ebsco.com/)
1. [Equinor](https://www.equinor.com/)
1. [Fairwinds](https://fairwinds.com/)
1. [Gardener](https://gardener.cloud/)
Expand Down
87 changes: 67 additions & 20 deletions cmd/argo/commands/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package commands
import (
"log"
"os"
"strings"

"github.com/argoproj/pkg/errors"
argoJson "github.com/argoproj/pkg/json"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/cmd/argo/commands/client"
"github.com/argoproj/argo/pkg/apiclient/cronworkflow"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
Expand All @@ -30,20 +33,33 @@ func NewSubmitCommand() *cobra.Command {
submitOpts util.SubmitOpts
cliSubmitOpts cliSubmitOpts
priority int32
from string
)
var command = &cobra.Command{
Use: "submit FILE1 FILE2...",
Use: "submit [FILE... | --from `kind/name]",
Short: "submit a workflow",
Example: `# Submit multiple workflows from files:
argo submit my-wf.yaml
# Submit a single workflow from an existing resource
argo submit --from cronwf/my-cron-wf
`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
if cmd.Flag("priority").Changed {
cliSubmitOpts.priority = &priority
}

SubmitWorkflows(args, &submitOpts, &cliSubmitOpts)
if from != "" {
if len(args) != 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
submitWorkflowFromResource(from, &submitOpts, &cliSubmitOpts)
} else {
submitWorkflowsFromFile(args, &submitOpts, &cliSubmitOpts)
}
},
}
command.Flags().StringVar(&submitOpts.Name, "name", "", "override metadata.name")
Expand All @@ -61,6 +77,7 @@ func NewSubmitCommand() *cobra.Command {
command.Flags().Int32Var(&priority, "priority", 0, "workflow priority")
command.Flags().StringVarP(&submitOpts.ParameterFile, "parameter-file", "f", "", "pass a file containing all input parameters")
command.Flags().StringVarP(&submitOpts.Labels, "labels", "l", "", "Comma separated labels to apply to the workflow. Will override previous values.")
command.Flags().StringVar(&from, "from", "", "Submit from an existing `kind/name` E.g., --from=cronwf/hello-world-cwf")
// Only complete files with appropriate extension.
err := command.Flags().SetAnnotation("parameter-file", cobra.BashCompFilenameExt, []string{"json", "yaml", "yml"})
if err != nil {
Expand All @@ -69,28 +86,58 @@ func NewSubmitCommand() *cobra.Command {
return command
}

func SubmitWorkflows(filePaths []string, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {
if submitOpts == nil {
submitOpts = &util.SubmitOpts{}
func submitWorkflowsFromFile(filePaths []string, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {
fileContents, err := util.ReadManifest(filePaths...)
errors.CheckError(err)

var workflows []wfv1.Workflow
for _, body := range fileContents {
wfs := unmarshalWorkflows(body, cliOpts.strict)
workflows = append(workflows, wfs...)
}
if cliOpts == nil {
cliOpts = &cliSubmitOpts{}

submitWorkflows(workflows, submitOpts, cliOpts)
}

func submitWorkflowFromResource(resourceIdentifier string, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {

parts := strings.SplitN(resourceIdentifier, "/", 2)
if len(parts) != 2 {
log.Fatalf("resource identifier '%s' is malformed. Should be `kind/name`, e.g. cronwf/hello-world-cwf", resourceIdentifier)
}
kind := parts[0]
name := parts[1]

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
serviceClient := apiClient.NewCronWorkflowServiceClient()
namespace := client.Namespace()

fileContents, err := util.ReadManifest(filePaths...)
if err != nil {
log.Fatal(err)
var workflowToSubmit *wfv1.Workflow
switch kind {
case workflow.CronWorkflowKind, workflow.CronWorkflowSingular, workflow.CronWorkflowPlural, workflow.CronWorkflowShortName:
cronWf, err := serviceClient.GetCronWorkflow(ctx, &cronworkflow.GetCronWorkflowRequest{
Name: name,
Namespace: namespace,
})
if err != nil {
log.Fatalf("Unable to get CronWorkflow '%s': %s", name, err)
}
workflowToSubmit, err = common.ConvertCronWorkflowToWorkflow(cronWf)
if err != nil {
log.Fatalf("Unable to create Workflow from CronWorkflow '%s': %s", name, err)
}
default:
log.Fatalf("Resource kind '%s' is not supported with --from", kind)
}

var workflows []wfv1.Workflow
for _, body := range fileContents {
wfs := unmarshalWorkflows(body, cliOpts.strict)
workflows = append(workflows, wfs...)
}
submitWorkflows([]wfv1.Workflow{*workflowToSubmit}, submitOpts, cliOpts)
}

func submitWorkflows(workflows []wfv1.Workflow, submitOpts *util.SubmitOpts, cliOpts *cliSubmitOpts) {

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

if cliOpts.watch {
if len(workflows) > 1 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (

"k8s.io/client-go/tools/clientcmd"

cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive"
)

type Client interface {
NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error)
NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient
NewCronWorkflowServiceClient() cronworkflowpkg.CronWorkflowServiceClient
}

func NewClient(argoServer string, authSupplier func() string, clientConfig clientcmd.ClientConfig) (context.Context, Client, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/argo-server-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive"
)
Expand All @@ -26,6 +27,10 @@ func (a *argoServerClient) NewWorkflowServiceClient() workflowpkg.WorkflowServic
return workflowpkg.NewWorkflowServiceClient(a.ClientConn)
}

func (a *argoServerClient) NewCronWorkflowServiceClient() cronworkflowpkg.CronWorkflowServiceClient {
return cronworkflowpkg.NewCronWorkflowServiceClient(a.ClientConn)
}

func (a *argoServerClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) {
return workflowarchivepkg.NewArchivedWorkflowServiceClient(a.ClientConn), nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/classic-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo/pkg/apiclient/cronworkflow"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
Expand All @@ -32,6 +33,10 @@ func (a *classicClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceCl
return &classicWorkflowServiceClient{a.Interface}
}

func (a *classicClient) NewCronWorkflowServiceClient() cronworkflow.CronWorkflowServiceClient {
return &classicCronWorkflowServiceClient{a.Interface}
}

func (a *classicClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) {
return nil, fmt.Errorf("it is impossible to interact with the workflow archive if you are not using the Argo Server, see " + help.CLI)
}
40 changes: 40 additions & 0 deletions pkg/apiclient/classic-cron-workflow-service-client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package apiclient

import (
"context"

"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/pkg/apiclient/cronworkflow"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
)

type classicCronWorkflowServiceClient struct {
versioned.Interface
}

func (c *classicCronWorkflowServiceClient) CreateCronWorkflow(_ context.Context, _ *cronworkflow.CreateCronWorkflowRequest, opts ...grpc.CallOption) (*v1alpha1.CronWorkflow, error) {
panic("implement me")
}

func (c *classicCronWorkflowServiceClient) ListCronWorkflows(_ context.Context, _ *cronworkflow.ListCronWorkflowsRequest, opts ...grpc.CallOption) (*v1alpha1.CronWorkflowList, error) {
panic("implement me")
}

func (c *classicCronWorkflowServiceClient) GetCronWorkflow(_ context.Context, req *cronworkflow.GetCronWorkflowRequest, opts ...grpc.CallOption) (*v1alpha1.CronWorkflow, error) {
options := metav1.GetOptions{}
if req.GetOptions != nil {
options = *req.GetOptions
}
return c.Interface.ArgoprojV1alpha1().CronWorkflows(req.GetNamespace()).Get(req.GetName(), options)
}

func (c *classicCronWorkflowServiceClient) UpdateCronWorkflow(_ context.Context, _ *cronworkflow.UpdateCronWorkflowRequest, opts ...grpc.CallOption) (*v1alpha1.CronWorkflow, error) {
panic("implement me")
}

func (c *classicCronWorkflowServiceClient) DeleteCronWorkflow(_ context.Context, _ *cronworkflow.DeleteCronWorkflowRequest, opts ...grpc.CallOption) (*cronworkflow.CronWorkflowDeletedResponse, error) {
panic("implement me")
}
Loading

0 comments on commit f8b8efc

Please sign in to comment.