Skip to content

Commit

Permalink
feat: instanceID support for argo server
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy committed Mar 5, 2020
1 parent 4714c88 commit 1a6fcd2
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 20 deletions.
33 changes: 33 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,39 @@ func (m migrate) Exec(ctx context.Context) error {
ansiSQLChange(`alter table `+m.tableName+` modify column nodes json not null`),
ansiSQLChange(`alter table `+m.tableName+` alter column nodes type json using nodes::json`),
),
// add instanceid column to table argo_archived_workflows and argo_archived_workflows_labels
ansiSQLChange(`alter table argo_archived_workflows add column instanceid varchar(64)`),
ansiSQLChange(`update argo_archived_workflows set instanceid = '' where instanceid is null`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows modify column instanceid varchar(64) not null`),
ansiSQLChange(`alter table argo_archived_workflows alter column instanceid set not null`),
),
ansiSQLChange(`alter table argo_archived_workflows_labels add column instanceid varchar(64)`),
ansiSQLChange(`update argo_archived_workflows_labels set instanceid = '' where instanceid is null`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows_labels modify column instanceid varchar(64) not null`),
ansiSQLChange(`alter table argo_archived_workflows_labels alter column instanceid set not null`),
),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows_labels drop foreign key argo_archived_workflows_labels_ibfk_1`),
ansiSQLChange(`alter table argo_archived_workflows_labels drop constraint argo_archived_workflows_labels_clustername_uid_fkey`),
),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows drop primary key`),
ansiSQLChange(`alter table argo_archived_workflows drop constraint argo_archived_workflows_pkey`),
),
ansiSQLChange(`alter table argo_archived_workflows add primary key(clustername,instanceid,uid)`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table argo_archived_workflows_labels drop primary key`),
ansiSQLChange(`alter table argo_archived_workflows_labels drop constraint argo_archived_workflows_labels_pkey`),
),
ansiSQLChange(`alter table argo_archived_workflows_labels add primary key(clustername,instanceid,uid,name)`),
ansiSQLChange(`alter table argo_archived_workflows_labels add constraint argo_archived_workflows_labels_fk1 foreign key (clustername,instanceid,uid) references argo_archived_workflows(clustername, instanceid, uid) on delete cascade`),
ternary(dbType == MySQL,
ansiSQLChange(`drop index argo_archived_workflows_i1 on argo_archived_workflows`),
ansiSQLChange(`drop index argo_archived_workflows_i1`),
),
ansiSQLChange(`create index argo_archived_workflows_i1 on argo_archived_workflows (clustername,instanceid,namespace)`),
} {
err := m.applyChange(ctx, changeSchemaVersion, change)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const archiveLabelsTableName = archiveTableName + "_labels"

type archivedWorkflowMetadata struct {
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
Name string `db:"name"`
Namespace string `db:"namespace"`
Expand All @@ -36,6 +37,7 @@ type archivedWorkflowRecord struct {

type archivedWorkflowLabelRecord struct {
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
// Why is this called "name" not "key"? Key is an SQL reserved word.
Key string `db:"name"`
Expand All @@ -52,11 +54,13 @@ type WorkflowArchive interface {
type workflowArchive struct {
session sqlbuilder.Database
clusterName string
instanceID string
dbType dbType
}

func NewWorkflowArchive(session sqlbuilder.Database, clusterName string) WorkflowArchive {
return &workflowArchive{session: session, clusterName: clusterName, dbType: dbTypeFor(session)}
// NewWorkflowArchive returns a new workflowArchive
func NewWorkflowArchive(session sqlbuilder.Database, clusterName string, instanceID string) WorkflowArchive {
return &workflowArchive{session: session, clusterName: clusterName, instanceID: instanceID, dbType: dbTypeFor(session)}
}

func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Expand All @@ -74,6 +78,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Insert(&archivedWorkflowRecord{
archivedWorkflowMetadata: archivedWorkflowMetadata{
ClusterName: r.clusterName,
InstanceID: r.instanceID,
UID: string(wf.UID),
Name: wf.Name,
Namespace: wf.Namespace,
Expand All @@ -92,6 +97,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Set("startedat", wf.Status.StartedAt.Time).
Set("finishedat", wf.Status.FinishedAt.Time).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": wf.UID}).
Exec()
if err != nil {
Expand All @@ -114,6 +120,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
_, err := sess.Collection(archiveLabelsTableName).
Insert(&archivedWorkflowLabelRecord{
ClusterName: r.clusterName,
InstanceID: r.instanceID,
UID: string(wf.UID),
Key: key,
Value: value,
Expand All @@ -124,6 +131,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
Update(archiveLabelsTableName).
Set("value", value).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": wf.UID}).
And(db.Cond{"name": key}).
Exec()
Expand All @@ -150,6 +158,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, labelRequirements labe
Select("name", "namespace", "uid", "phase", "startedat", "finishedat").
From(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(namespaceEqual(namespace)).
And(clause).
OrderBy("-startedat").
Expand Down Expand Up @@ -192,6 +201,7 @@ func (r *workflowArchive) GetWorkflow(uid string) (*wfv1.Workflow, error) {
Select("workflow").
From(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": uid}).
One(archivedWf)
if err != nil {
Expand All @@ -212,6 +222,7 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error {
rs, err := r.session.
DeleteFrom(archiveTableName).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"instanceid": r.instanceID}).
And(db.Cond{"uid": uid}).
Exec()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newArgoKubeClient(clientConfig clientcmd.ClientConfig) (context.Context, Cl
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(argoKubeOffloadNodeStatusRepo)}
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer("", argoKubeOffloadNodeStatusRepo)}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() cronworkflow.CronWorkflowServiceClient {
Expand Down
8 changes: 4 additions & 4 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
}
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), configMap.InstanceID)
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(configMap.InstanceID, offloadRepo, wfArchive)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

// Start listener
Expand Down Expand Up @@ -169,7 +169,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
<-as.stopCh
}

func (as *argoServer) newGRPCServer(offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive) *grpc.Server {
func (as *argoServer) newGRPCServer(instanceID string, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive) *grpc.Server {
serverLog := log.NewEntry(log.StandardLogger())

sOpts := []grpc.ServerOption{
Expand All @@ -196,7 +196,7 @@ func (as *argoServer) newGRPCServer(offloadNodeStatusRepo sqldb.OffloadNodeStatu
grpcServer := grpc.NewServer(sOpts...)

infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace))
workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(offloadNodeStatusRepo))
workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(instanceID, offloadNodeStatusRepo))
workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer())
cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer())
workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, workflowarchive.NewWorkflowArchiveServer(wfArchive))
Expand Down
111 changes: 101 additions & 10 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
)

type workflowServer struct {
instanceID string
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
}

func NewWorkflowServer(offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer {
// NewWorkflowServer returns a new workflowServer
func NewWorkflowServer(instanceID string, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer {
return &workflowServer{
instanceID: instanceID,
offloadNodeStatusRepo: offloadNodeStatusRepo,
}
}
Expand All @@ -41,12 +44,16 @@ func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.Wo
req.Workflow.Namespace = req.Namespace
}

if req.InstanceID != "" {
if len(req.InstanceID) > 0 || len(s.instanceID) > 0 {
labels := req.Workflow.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
labels[common.LabelKeyControllerInstanceID] = req.InstanceID
if len(req.InstanceID) > 0 {
labels[common.LabelKeyControllerInstanceID] = req.InstanceID
} else {
labels[common.LabelKeyControllerInstanceID] = s.instanceID
}
req.Workflow.SetLabels(labels)
}

Expand Down Expand Up @@ -87,6 +94,11 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return nil, err
}

err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
Expand All @@ -112,6 +124,7 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
if req.ListOptions != nil {
listOption = *req.ListOptions
}
listOption = s.withInstanceID(listOption)

wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(listOption)
if err != nil {
Expand Down Expand Up @@ -142,6 +155,7 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
if req.ListOptions != nil {
opts = *req.ListOptions
}
opts = s.withInstanceID(opts)
watch, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Watch(opts)
if err != nil {
return err
Expand Down Expand Up @@ -194,7 +208,16 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.WorkflowDeleteRequest) (*workflowpkg.WorkflowDeleteResponse, error) {
wfClient := auth.GetWfClient(ctx)

err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Delete(req.Name, &metav1.DeleteOptions{})
wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Delete(req.Name, &metav1.DeleteOptions{})
if err != nil {
return nil, err
}
Expand All @@ -209,6 +232,10 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

wf, err = util.RetryWorkflow(kubeClient, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf)
if err != nil {
Expand All @@ -223,6 +250,10 @@ func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

newWF, err := util.FormulateResubmitWorkflow(wf, req.Memoized)
if err != nil {
Expand All @@ -238,13 +269,23 @@ func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.

func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
err := util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
if err != nil {
log.Warnf("Failed to resume %s: %+v", req.Name, err)
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -254,12 +295,22 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo

func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
err := util.SuspendWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
err = util.SuspendWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
if err != nil {
return nil, err
}

wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -269,12 +320,22 @@ func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.W

func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
err := util.TerminateWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = s.validateInstanceID(wf)
if err != nil {
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
err = util.TerminateWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
if err != nil {
return nil, err
}

wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -305,3 +366,33 @@ func (s *workflowServer) PodLogs(req *workflowpkg.WorkflowLogRequest, ws workflo
logger.Run(ctx)
return nil
}

func (s *workflowServer) withInstanceID(opt metav1.ListOptions) metav1.ListOptions {
if len(opt.LabelSelector) > 0 {
opt.LabelSelector += ","
}
if len(s.instanceID) == 0 {
opt.LabelSelector += fmt.Sprintf("!%s", common.LabelKeyControllerInstanceID)
return opt
}
opt.LabelSelector += fmt.Sprintf("%s=%s", common.LabelKeyControllerInstanceID, s.instanceID)
return opt
}

func (s *workflowServer) validateInstanceID(wf *v1alpha1.Workflow) error {
if len(s.instanceID) == 0 {
if len(wf.Labels) == 0 {
return nil
}
if _, ok := wf.Labels[common.LabelKeyControllerInstanceID]; !ok {
return nil
}
} else if len(wf.Labels) > 0 {
if val, ok := wf.Labels[common.LabelKeyControllerInstanceID]; ok {
if val == s.instanceID {
return nil
}
}
}
return fmt.Errorf("the workflow is not managed by current Argo server")
}
2 changes: 1 addition & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) {
offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("IsEnabled", mock.Anything).Return(true)
offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil)
server := NewWorkflowServer(offloadNodeStatusRepo)
server := NewWorkflowServer("", offloadNodeStatusRepo)
kubeClientSet := fake.NewSimpleClientset()
wfClientset := v1alpha.NewSimpleClientset(&wfObj1, &wfObj2, &wfObj3, &wfObj4, &wfObj5)
wfClientset.PrependReactor("create", "workflows", generateNameReactor)
Expand Down
1 change: 1 addition & 0 deletions server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type archivedWorkflowServer struct {
wfArchive sqldb.WorkflowArchive
}

// NewWorkflowArchiveServer returns a new archivedWorkflowServer
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive: wfArchive}
}
Expand Down
Loading

0 comments on commit 1a6fcd2

Please sign in to comment.