Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: instanceID support for argo server. Closes #2004 #2365

Merged
merged 14 commits into from
Mar 13, 2020
33 changes: 33 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,39 @@ func (m migrate) Exec(ctx context.Context) error {
ansiSQLChange(`alter table `+m.tableName+` modify column nodes json not null`),
ansiSQLChange(`alter table `+m.tableName+` alter column nodes type json using nodes::json`),
),
// add instanceid column to table argo_archived_workflows and argo_archived_workflows_labels
ansiSQLChange(`alter table argo_archived_workflows add column instanceid varchar(64)`),
ansiSQLChange(`update argo_archived_workflows set instanceid = '' where instanceid is null`),
whynowy marked this conversation as resolved.
Show resolved Hide resolved
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column instanceid varchar(64) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column instanceid set not null`),
),
ansiSQLChange(`alter table argo_archived_workflows_labels add column instanceid varchar(64)`),
ansiSQLChange(`update argo_archived_workflows_labels set instanceid = '' where instanceid is null`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows_labels modify column instanceid varchar(64) not null`),
ansiSQLChange(`alter table argo_archived_workflows_labels alter column instanceid set not null`),
),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows_labels drop foreign key argo_archived_workflows_labels_ibfk_1`),
ansiSQLChange(`alter table argo_archived_workflows_labels drop constraint argo_archived_workflows_labels_clustername_uid_fkey`),
),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows drop primary key`),
ansiSQLChange(`alter table argo_archived_workflows drop constraint argo_archived_workflows_pkey`),
),
ansiSQLChange(`alter table argo_archived_workflows add primary key(clustername,instanceid,uid)`),
alexec marked this conversation as resolved.
Show resolved Hide resolved
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows_labels drop primary key`),
ansiSQLChange(`alter table argo_archived_workflows_labels drop constraint argo_archived_workflows_labels_pkey`),
),
ansiSQLChange(`alter table argo_archived_workflows_labels add primary key(clustername,instanceid,uid,name)`),
alexec marked this conversation as resolved.
Show resolved Hide resolved
ansiSQLChange(`alter table argo_archived_workflows_labels add constraint argo_archived_workflows_labels_fk1 foreign key (clustername,instanceid,uid) references argo_archived_workflows(clustername, instanceid, uid) on delete cascade`),
ternary(dbType == MySQL,
ansiSQLChange(`drop index argo_archived_workflows_i1 on argo_archived_workflows`),
ansiSQLChange(`drop index argo_archived_workflows_i1`),
),
ansiSQLChange(`create index argo_archived_workflows_i1 on argo_archived_workflows (clustername,instanceid,namespace)`),
whynowy marked this conversation as resolved.
Show resolved Hide resolved
} {
err := m.applyChange(ctx, changeSchemaVersion, change)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const archiveLabelsTableName = archiveTableName + "_labels"

type archivedWorkflowMetadata struct {
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
Name string `db:"name"`
Namespace string `db:"namespace"`
Expand All @@ -36,6 +37,7 @@ type archivedWorkflowRecord struct {

type archivedWorkflowLabelRecord struct {
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
// Why is this called "name" not "key"? Key is an SQL reserved word.
Key string `db:"name"`
Expand All @@ -52,11 +54,13 @@ type WorkflowArchive interface {
type workflowArchive struct {
session sqlbuilder.Database
clusterName string
instanceID string
dbType dbType
}

func NewWorkflowArchive(session sqlbuilder.Database, clusterName string) WorkflowArchive {
return &workflowArchive{session: session, clusterName: clusterName, dbType: dbTypeFor(session)}
// NewWorkflowArchive returns a new workflowArchive
func NewWorkflowArchive(session sqlbuilder.Database, clusterName string, instanceID string) WorkflowArchive {
return &workflowArchive{session: session, clusterName: clusterName, instanceID: instanceID, dbType: dbTypeFor(session)}
}

func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Expand All @@ -74,6 +78,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Insert(&archivedWorkflowRecord{
archivedWorkflowMetadata: archivedWorkflowMetadata{
ClusterName: r.clusterName,
InstanceID: r.instanceID,
UID: string(wf.UID),
Name: wf.Name,
Namespace: wf.Namespace,
Expand All @@ -92,6 +97,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Set("startedat", wf.Status.StartedAt.Time).
Set("finishedat", wf.Status.FinishedAt.Time).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": wf.UID}).
Exec()
if err != nil {
Expand All @@ -114,6 +120,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
_, err := sess.Collection(archiveLabelsTableName).
Insert(&archivedWorkflowLabelRecord{
ClusterName: r.clusterName,
InstanceID: r.instanceID,
UID: string(wf.UID),
Key: key,
Value: value,
Expand All @@ -124,6 +131,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Update(archiveLabelsTableName).
Set("value", value).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": wf.UID}).
And(db.Cond{"name": key}).
Exec()
Expand All @@ -150,6 +158,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, labelRequirements labe
Select("name", "namespace", "uid", "phase", "startedat", "finishedat").
From(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(namespaceEqual(namespace)).
And(clause).
OrderBy("-startedat").
Expand Down Expand Up @@ -192,6 +201,7 @@ func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) {
Select("workflow").
From(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": uid}).
One(archivedWf)
if err != nil {
Expand All @@ -212,6 +222,7 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error {
rs, err := r.session.
DeleteFrom(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": uid}).
Exec()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func newArgoKubeClient(clientConfig clientcmd.ClientConfig) (context.Context, Cl
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(argoKubeOffloadNodeStatusRepo)}
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer("", argoKubeOffloadNodeStatusRepo)}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() cronworkflow.CronWorkflowServiceClient {
return &argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer()}
return &argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer("")}
}
func (a *argoKubeClient) NewWorkflowTemplateServiceClient() workflowtemplate.WorkflowTemplateServiceClient {
return &argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer()}
Expand Down
10 changes: 5 additions & 5 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
}
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), configMap.InstanceID)
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(configMap.InstanceID, offloadRepo, wfArchive)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

// Start listener
Expand Down Expand Up @@ -169,7 +169,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
<-as.stopCh
}

func (as *argoServer) newGRPCServer(offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive) *grpc.Server {
func (as *argoServer) newGRPCServer(instanceID string, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive) *grpc.Server {
serverLog := log.NewEntry(log.StandardLogger())

sOpts := []grpc.ServerOption{
Expand All @@ -196,9 +196,9 @@ func (as *argoServer) newGRPCServer(offloadNodeStatusRepo sqldb.OffloadNodeStatu
grpcServer := grpc.NewServer(sOpts...)

infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace))
workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(offloadNodeStatusRepo))
workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(instanceID, offloadNodeStatusRepo))
workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer())
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer())
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceID))
workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, workflowarchive.NewWorkflowArchiveServer(wfArchive))

return grpcServer
Expand Down
73 changes: 67 additions & 6 deletions server/cronworkflow/cron_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@ package cronworkflow

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/templateresolution"
"github.com/argoproj/argo/workflow/validate"
)

type cronWorkflowServiceServer struct {
instanceID string
}

func NewCronWorkflowServer() cronworkflowpkg.CronWorkflowServiceServer {
return &cronWorkflowServiceServer{}
// NewCronWorkflowServer returns a new cronWorkflowServiceServer
func NewCronWorkflowServer(instanceID string) cronworkflowpkg.CronWorkflowServiceServer {
whynowy marked this conversation as resolved.
Show resolved Hide resolved
return &cronWorkflowServiceServer{instanceID: instanceID}
}

func (c *cronWorkflowServiceServer) LintCronWorkflow(ctx context.Context, req *cronworkflowpkg.LintCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
Expand All @@ -34,10 +39,19 @@ func (c *cronWorkflowServiceServer) ListCronWorkflows(ctx context.Context, req *
if req.ListOptions != nil {
options = *req.ListOptions
}
options = c.withInstanceID(options)
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).List(options)
}

func (c *cronWorkflowServiceServer) CreateCronWorkflow(ctx context.Context, req *cronworkflowpkg.CreateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
if len(c.instanceID) > 0 {
labels := req.CronWorkflow.GetLabels()
whynowy marked this conversation as resolved.
Show resolved Hide resolved
if labels == nil {
labels = make(map[string]string)
}
labels[common.LabelKeyControllerInstanceID] = c.instanceID
req.CronWorkflow.SetLabels(labels)
}
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Create(req.CronWorkflow)
}

Expand All @@ -46,21 +60,68 @@ func (c *cronWorkflowServiceServer) GetCronWorkflow(ctx context.Context, req *cr
if req.GetOptions != nil {
options = *req.GetOptions
}
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Get(req.Name, options)
return c.getCronWorkflow(ctx, req.Namespace, req.Name, options)
}

func (c *cronWorkflowServiceServer) UpdateCronWorkflow(ctx context.Context, req *cronworkflowpkg.UpdateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
cronWf, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(req.CronWorkflow)
_, err := c.getCronWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return cronWf, nil
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(req.CronWorkflow)
}

func (c *cronWorkflowServiceServer) DeleteCronWorkflow(ctx context.Context, req *cronworkflowpkg.DeleteCronWorkflowRequest) (*cronworkflowpkg.CronWorkflowDeletedResponse, error) {
err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Delete(req.Name, req.DeleteOptions)
_, err := c.getCronWorkflow(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Delete(req.Name, req.DeleteOptions)
if err != nil {
return nil, err
}
return &cronworkflowpkg.CronWorkflowDeletedResponse{}, nil
}

func (c *cronWorkflowServiceServer) withInstanceID(opt metav1.ListOptions) metav1.ListOptions {
if len(opt.LabelSelector) > 0 {
opt.LabelSelector += ","
}
if len(c.instanceID) == 0 {
opt.LabelSelector += fmt.Sprintf("!%s", common.LabelKeyControllerInstanceID)
return opt
}
opt.LabelSelector += fmt.Sprintf("%s=%s", common.LabelKeyControllerInstanceID, c.instanceID)
return opt
}

func (c *cronWorkflowServiceServer) getCronWorkflow(ctx context.Context, namespace string, name string, options v1.GetOptions) (*v1alpha1.CronWorkflow, error) {
wfClient := auth.GetWfClient(ctx)
cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, options)
if err != nil {
return nil, err
}
err = c.validateInstanceID(cronWf)
if err != nil {
return nil, err
}
return cronWf, nil
}

func (c *cronWorkflowServiceServer) validateInstanceID(cronWf *v1alpha1.CronWorkflow) error {
if len(c.instanceID) == 0 {
if len(cronWf.Labels) == 0 {
return nil
}
if _, ok := cronWf.Labels[common.LabelKeyControllerInstanceID]; !ok {
return nil
}
} else if len(cronWf.Labels) > 0 {
if val, ok := cronWf.Labels[common.LabelKeyControllerInstanceID]; ok {
if val == c.instanceID {
return nil
}
}
}
return fmt.Errorf("the CronWorkflow is not managed by current Argo server")
}
2 changes: 1 addition & 1 deletion server/cronworkflow/cron_workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Test_cronWorkflowServiceServer(t *testing.T) {
ObjectMeta: v1.ObjectMeta{Namespace: "my-ns", Name: "my-name"},
}
wfClientset := wftFake.NewSimpleClientset()
server := NewCronWorkflowServer()
server := NewCronWorkflowServer("")
ctx := context.WithValue(context.TODO(), auth.WfKey, wfClientset)

t.Run("CreateCronWorkflow", func(t *testing.T) {
Expand Down
Loading