Skip to content

Commit

Permalink
fix(server): Re-establish watch on v1.Status errors. Fixes #3608 (#3609)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jul 28, 2020
1 parent c063f9f commit dea03a9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 43 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ test-results/junit.xml: $(GOPATH)/bin/go-junit-report test-results/test.out
cat test-results/test.out | go-junit-report > test-results/junit.xml

dist/$(PROFILE).yaml: $(MANIFESTS) $(E2E_MANIFESTS)
mkdir -p dist
kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/$(PROFILE).yaml

.PHONY: install
Expand Down
57 changes: 29 additions & 28 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ package workflow

import (
"fmt"
"reflect"
"sort"

"github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/argo/workflow/creator"

log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,12 +12,14 @@ import (
"github.com/argoproj/argo/persist/sqldb"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
"github.com/argoproj/argo/pkg/apis/workflow"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/argo/server/auth"
argoutil "github.com/argoproj/argo/util"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/logs"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/creator"
"github.com/argoproj/argo/workflow/hydrator"
"github.com/argoproj/argo/workflow/templateresolution"
"github.com/argoproj/argo/workflow/util"
Expand All @@ -41,7 +39,7 @@ func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRe
return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
}

func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

if req.Workflow == nil {
Expand Down Expand Up @@ -82,7 +80,7 @@ func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.Wo
return wf, nil
}

func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.WorkflowGetRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.WorkflowGetRequest) (*wfv1.Workflow, error) {
wfGetOption := metav1.GetOptions{}
if req.GetOptions != nil {
wfGetOption = *req.GetOptions
Expand All @@ -103,7 +101,7 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, err
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*v1alpha1.WorkflowList, error) {
func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

var listOption = &metav1.ListOptions{}
Expand Down Expand Up @@ -134,7 +132,7 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
// we make no promises about the overall list sorting, we just sort each page
sort.Sort(wfList.Items)

return &v1alpha1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items}, nil
return &wfv1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items}, nil
}

func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest, ws workflowpkg.WorkflowService_WatchWorkflowsServer) error {
Expand Down Expand Up @@ -170,20 +168,21 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
select {
case <-ctx.Done():
return nil
case event, open := <-watch.ResultChan():
if !open {
case event, ok := <-watch.ResultChan():
var wf *wfv1.Workflow
if ok {
wf, ok = event.Object.(*wfv1.Workflow)
}
if !ok {
log.Debug("Re-establishing workflow watch")
watch.Stop()
watch, err = wfIf.Watch(*opts)
if err != nil {
return err
}
continue
}
log.Debug("Received event")
wf, ok := event.Object.(*v1alpha1.Workflow)
if !ok {
return fmt.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object))
}
logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": event.Type, "phase": wf.Status.Phase})
err := s.hydrator.Hydrate(wf)
if err != nil {
Expand All @@ -194,6 +193,8 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
if err != nil {
return err
}
// when we re-establish, we want to start at the same place
opts.ResourceVersion = wf.ResourceVersion
}
}
}
Expand All @@ -215,7 +216,7 @@ func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.Wo
return &workflowpkg.WorkflowDeleteResponse{}, nil
}

func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.WorkflowRetryRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.WorkflowRetryRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)

Expand All @@ -236,7 +237,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return wf, nil
}

func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowResubmitRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowResubmitRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -253,14 +254,14 @@ func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.
return nil, err
}

created, err := util.SubmitWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wfClient, req.Namespace, newWF, &v1alpha1.SubmitOpts{})
created, err := util.SubmitWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wfClient, req.Namespace, newWF, &wfv1.SubmitOpts{})
if err != nil {
return nil, err
}
return created, nil
}

func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -286,7 +287,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
return wf, nil
}

func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
Expand All @@ -312,7 +313,7 @@ func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.W
return wf, nil
}

func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
Expand All @@ -337,7 +338,7 @@ func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg
return wf, nil
}

func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -359,7 +360,7 @@ func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.Work
return wf, nil
}

func (s *workflowServer) LintWorkflow(ctx context.Context, req *workflowpkg.WorkflowLintRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) LintWorkflow(ctx context.Context, req *workflowpkg.WorkflowLintRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
Expand Down Expand Up @@ -391,7 +392,7 @@ func (s *workflowServer) PodLogs(req *workflowpkg.WorkflowLogRequest, ws workflo
return logs.WorkflowLogs(ctx, wfClient, kubeClient, req, ws)
}

func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace string, name string, options metav1.GetOptions) (*v1alpha1.Workflow, error) {
func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace string, name string, options metav1.GetOptions) (*wfv1.Workflow, error) {
if name == latestAlias {
latest, err := getLatestWorkflow(wfClient, namespace)
if err != nil {
Expand All @@ -407,11 +408,11 @@ func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace str
return wf, nil
}

func (s *workflowServer) validateWorkflow(wf *v1alpha1.Workflow) error {
func (s *workflowServer) validateWorkflow(wf *wfv1.Workflow) error {
return s.instanceIDService.Validate(wf)
}

func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*v1alpha1.Workflow, error) {
func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*wfv1.Workflow, error) {
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, err
Expand All @@ -428,9 +429,9 @@ func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*v1alpha
return &latest, nil
}

func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowSubmitRequest) (*v1alpha1.Workflow, error) {
func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowSubmitRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
var wf *v1alpha1.Workflow
var wf *wfv1.Workflow
switch req.ResourceKind {
case workflow.CronWorkflowKind, workflow.CronWorkflowSingular, workflow.CronWorkflowPlural, workflow.CronWorkflowShortName:
cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(req.Namespace).Get(req.ResourceName, metav1.GetOptions{})
Expand Down
32 changes: 17 additions & 15 deletions util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package logs
import (
"bufio"
"context"
"reflect"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -164,25 +163,27 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
select {
case <-ctx.Done():
return
case event, open := <-wfWatch.ResultChan():
if !open {
case event, ok := <-wfWatch.ResultChan():
var wf *wfv1.Workflow
if ok {
wf, ok = event.Object.(*wfv1.Workflow)
}
if !ok {
logCtx.Debug("Re-establishing workflow watch")
wfWatch.Stop()
wfWatch, err = wfInterface.Watch(wfListOptions)
if err != nil {
logCtx.Error(err)
return
}
continue
}
wf, ok := event.Object.(*wfv1.Workflow)
if !ok {
logCtx.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object))
return
}
logCtx.WithFields(log.Fields{"eventType": event.Type, "completed": wf.Status.Fulfilled()}).Debug("Workflow event")
if event.Type == watch.Deleted || wf.Status.Fulfilled() {
return
}
// in case we re-establish the watch, make sure we start at the same place
wfListOptions.ResourceVersion = wf.ResourceVersion
}
}
}()
Expand All @@ -197,25 +198,26 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
select {
case <-stopWatchingPods:
return
case event, open := <-podWatch.ResultChan():
if !open {
case event, ok := <-podWatch.ResultChan():
var pod *corev1.Pod
if ok {
pod, ok = event.Object.(*corev1.Pod)
}
if !ok {
logCtx.Info("Re-establishing pod watch")
podWatch.Stop()
podWatch, err = podInterface.Watch(podListOptions)
if err != nil {
logCtx.Error(err)
return
}
continue
}
pod, ok := event.Object.(*corev1.Pod)
if !ok {
logCtx.Errorf("watch object was not a pod %v", reflect.TypeOf(event.Object))
return
}
logCtx.WithFields(log.Fields{"eventType": event.Type, "podName": pod.GetName(), "phase": pod.Status.Phase}).Debug("Pod event")
if pod.Status.Phase == corev1.PodRunning {
ensureWeAreStreaming(pod)
}
podListOptions.ResourceVersion = pod.ResourceVersion
}
}
}()
Expand Down

0 comments on commit dea03a9

Please sign in to comment.