From 859fc6a0af964e1e7909f317fc81c38cc23b3e7f Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Fri, 28 Dec 2018 15:23:48 -0800 Subject: [PATCH] Issue #1104 - Remove container wait timeout from 'argo logs --follow' (#1142) --- cmd/argo/commands/logs.go | 152 ++++++++++++++++------------- workflow/executor/k8sapi/client.go | 4 +- 2 files changed, 88 insertions(+), 68 deletions(-) diff --git a/cmd/argo/commands/logs.go b/cmd/argo/commands/logs.go index e17be636efdb..a41ba6830b9c 100644 --- a/cmd/argo/commands/logs.go +++ b/cmd/argo/commands/logs.go @@ -2,8 +2,10 @@ package commands import ( "bufio" + "context" "fmt" "hash/fnv" + "math" "os" "strconv" @@ -11,16 +13,21 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + pkgwatch "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/watch" + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" - wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" - wfinformers "github.com/argoproj/argo/pkg/client/informers/externalversions" + workflowv1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/argoproj/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" ) type logEntry struct { @@ -101,8 +108,8 @@ func (p *logPrinter) PrintWorkflowLogs(workflow string) error { return err } timeByPod := p.printRecentWorkflowLogs(wf) - if p.follow && wf.Status.Phase == v1alpha1.NodeRunning { - p.printLiveWorkflowLogs(wf, timeByPod) + if p.follow { + p.printLiveWorkflowLogs(wf.Name, wfClient, timeByPod) } return nil } @@ -114,7 +121,7 @@ func (p *logPrinter) PrintPodLogs(podName string) error { return err } var logs []logEntry - err = p.getPodLogs("", podName, namespace, p.follow, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) { + err = p.getPodLogs(context.Background(), "", podName, namespace, p.follow, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) { logs = append(logs, entry) }) if err != nil { @@ -144,7 +151,7 @@ func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]* go func() { defer wg.Done() var podLogs []logEntry - err := p.getPodLogs(getDisplayName(node), node.ID, wf.Namespace, false, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) { + err := p.getPodLogs(context.Background(), getDisplayName(node), node.ID, wf.Namespace, false, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) { podLogs = append(podLogs, entry) }) @@ -178,33 +185,12 @@ func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]* return timeByPod } -func (p *logPrinter) setupWorkflowInformer(namespace string, name string, callback func(wf *v1alpha1.Workflow, done bool)) cache.SharedIndexInformer { - wfcClientset := wfclientset.NewForConfigOrDie(restConfig) - wfInformerFactory := wfinformers.NewFilteredSharedInformerFactory(wfcClientset, 20*time.Minute, namespace, nil) - informer := wfInformerFactory.Argoproj().V1alpha1().Workflows().Informer() - informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(old, new interface{}) { - updatedWf := new.(*v1alpha1.Workflow) - if updatedWf.Name == name { - callback(updatedWf, updatedWf.Status.Phase != v1alpha1.NodeRunning) - } - }, - DeleteFunc: func(obj interface{}) { - deletedWf := obj.(*v1alpha1.Workflow) - if deletedWf.Name == name { - callback(deletedWf, true) - } - }, - }, - ) - return informer -} - // Prints live logs for workflow pods, starting from time specified in timeByPod name. -func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPod map[string]*time.Time) { +func (p *logPrinter) printLiveWorkflowLogs(workflowName string, wfClient workflowv1.WorkflowInterface, timeByPod map[string]*time.Time) { logs := make(chan logEntry) streamedPods := make(map[string]bool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() processPods := func(wf *v1alpha1.Workflow) { for id := range wf.Status.Nodes { @@ -218,7 +204,7 @@ func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPo sinceTime := metav1.NewTime(podTime.Add(time.Second)) sinceTimePtr = &sinceTime } - err := p.getPodLogs(getDisplayName(node), node.ID, wf.Namespace, true, nil, nil, sinceTimePtr, func(entry logEntry) { + err := p.getPodLogs(ctx, getDisplayName(node), node.ID, wf.Namespace, true, nil, nil, sinceTimePtr, func(entry logEntry) { logs <- entry }) if err != nil { @@ -229,20 +215,31 @@ func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPo } } - processPods(workflow) - informer := p.setupWorkflowInformer(workflow.Namespace, workflow.Name, func(wf *v1alpha1.Workflow, done bool) { - if done { - close(logs) - } else { - processPods(wf) - } - }) - - stopChannel := make(chan struct{}) go func() { - informer.Run(stopChannel) + defer close(logs) + fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", workflowName)) + listOpts := metav1.ListOptions{FieldSelector: fieldSelector.String()} + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return wfClient.List(listOpts) + }, + WatchFunc: func(options metav1.ListOptions) (pkgwatch.Interface, error) { + return wfClient.Watch(listOpts) + }, + } + _, err := watch.UntilWithSync(ctx, lw, &v1alpha1.Workflow{}, nil, func(event pkgwatch.Event) (b bool, e error) { + if wf, ok := event.Object.(*v1alpha1.Workflow); ok { + if !wf.Status.Completed() { + processPods(wf) + } + return wf.Status.Completed(), nil + } + return true, nil + }) + if err != nil { + log.Fatal(err) + } }() - defer close(stopChannel) for entry := range logs { p.printLogEntry(entry) @@ -273,35 +270,56 @@ func (p *logPrinter) printLogEntry(entry logEntry) { fmt.Println(line) } -func (p *logPrinter) ensureContainerStarted(podName string, podNamespace string, container string, retryCnt int, retryTimeout time.Duration) error { - for retryCnt > 0 { - pod, err := p.kubeClient.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) +func (p *logPrinter) hasContainerStarted(podName string, podNamespace string, container string) (bool, error) { + pod, err := p.kubeClient.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + var containerStatus *v1.ContainerStatus + for _, status := range pod.Status.ContainerStatuses { + if status.Name == container { + containerStatus = &status + break + } + } + if containerStatus == nil { + return false, nil + } + + if containerStatus.State.Waiting != nil { + return false, nil + } + return true, nil +} + +func (p *logPrinter) getPodLogs( + ctx context.Context, + displayName string, + podName string, + podNamespace string, + follow bool, + tail *int64, + sinceSeconds *int64, + sinceTime *metav1.Time, + callback func(entry logEntry)) error { + + for ctx.Err() == nil { + hasStarted, err := p.hasContainerStarted(podName, podNamespace, p.container) + if err != nil { return err } - var containerStatus *v1.ContainerStatus - for _, status := range pod.Status.ContainerStatuses { - if status.Name == container { - containerStatus = &status - break + if !hasStarted { + if follow { + time.Sleep(1 * time.Second) + } else { + return nil } - } - if containerStatus == nil || containerStatus.State.Waiting != nil { - time.Sleep(retryTimeout) - retryCnt-- } else { - return nil + break } } - return fmt.Errorf("container '%s' of pod '%s' has not started within expected timeout", container, podName) -} -func (p *logPrinter) getPodLogs( - displayName string, podName string, podNamespace string, follow bool, tail *int64, sinceSeconds *int64, sinceTime *metav1.Time, callback func(entry logEntry)) error { - err := p.ensureContainerStarted(podName, podNamespace, p.container, 10, time.Second) - if err != nil { - return err - } stream, err := p.kubeClient.CoreV1().Pods(podNamespace).GetLogs(podName, &v1.PodLogOptions{ Container: p.container, Follow: follow, diff --git a/workflow/executor/k8sapi/client.go b/workflow/executor/k8sapi/client.go index 5a949595ac2f..025e69a86686 100644 --- a/workflow/executor/k8sapi/client.go +++ b/workflow/executor/k8sapi/client.go @@ -9,6 +9,8 @@ import ( "syscall" "time" + "github.com/argoproj/argo/util" + "github.com/argoproj/argo/errors" "github.com/argoproj/argo/workflow/common" execcommon "github.com/argoproj/argo/workflow/executor/common" @@ -100,7 +102,7 @@ func (c *k8sAPIClient) saveLogs(containerID, path string) error { if err != nil { return errors.InternalWrapError(err) } - defer outFile.Close() + defer util.Close(outFile) _, err = io.Copy(outFile, reader) if err != nil { return errors.InternalWrapError(err)