From fe27bb70b51e23bd492e21cb4dbb1f9042c0e688 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Tue, 13 Aug 2024 22:16:17 +0200 Subject: [PATCH] fix: controlling issues (#5756) * fix: continue paused container, when the abort is requested * fix: ensure the lightweight container watcher will get `finishedAt` timestamp * chore: add minor todos * fix: configure no preemption policy by default for Test Workflows * fix: allow Test Workflow status notifier to update "Aborted" status with details * fix: ensure the parallel workers will not end without result * fix: properly build timestamps and detect finished resul in the TestWorkflowResult model * fix: use Pod/Job StatusConditions for detecting the status, make watching more resilient to external problems, expose more Kubernetes error details * chore: do not require job/pod events when fetching logs of parallel workers and services * fixup unit tests * fix: delete preemption policy setup * fixup unit tests * fix: adjust resume time to avoid negative duration * fix: calibrate clocks * chore: use consts * fixup unit tests --- .../testworkflow-toolkit/commands/parallel.go | 8 +- cmd/testworkflow-init/main.go | 2 + .../orchestration/executions.go | 1 + .../model_test_workflow_result_extended.go | 53 ++++-- .../testworkflowcontroller/controller.go | 18 +- .../testworkflowcontroller/logs.go | 17 ++ .../testworkflowcontroller/notifier.go | 10 +- .../testworkflowcontroller/podstate.go | 159 +++++++++++++----- .../testworkflowcontroller/utils.go | 11 +- .../watchinstrumentedpod.go | 126 +++++++++++--- .../testworkflowprocessor/processor.go | 1 + 11 files changed, 307 insertions(+), 99 deletions(-) diff --git a/cmd/tcl/testworkflow-toolkit/commands/parallel.go b/cmd/tcl/testworkflow-toolkit/commands/parallel.go index 24405cf1565..d9831faacc6 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/parallel.go +++ b/cmd/tcl/testworkflow-toolkit/commands/parallel.go @@ -263,6 +263,7 @@ func NewParallelCmd() *cobra.Command { prevStatus := testkube.QUEUED_TestWorkflowStatus prevStep := "" + prevIsFinished := false scheduled := false for v := range ctrl.WatchLightweight(ctx) { // Handle error @@ -283,14 +284,17 @@ func NewParallelCmd() *cobra.Command { } // Handle result change - if v.Status != prevStatus || v.Current != prevStep { + // TODO: the final status should always have the finishedAt too, + // there should be no need for checking isFinished diff + if v.Status != prevStatus || lastResult.IsFinished() != prevIsFinished || v.Current != prevStep { if v.Status != prevStatus { log(string(v.Status)) } updates <- Update{index: index, result: v.Result} prevStep = v.Current prevStatus = v.Status - if v.Result.IsFinished() { + prevIsFinished = lastResult.IsFinished() + if lastResult.IsFinished() { instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Result: v.Result}) ctxCancel() return v.Result.IsPassed() diff --git a/cmd/testworkflow-init/main.go b/cmd/testworkflow-init/main.go index 6d37c36437f..3c7be29e66d 100644 --- a/cmd/testworkflow-init/main.go +++ b/cmd/testworkflow-init/main.go @@ -110,6 +110,7 @@ func main() { orchestration.Pause(step, *step.StartedAt) for _, parentRef := range step.Parents { parent := state.GetStep(parentRef) + // TODO: What about parents of the parents? orchestration.Pause(parent, *step.StartedAt) } return err @@ -125,6 +126,7 @@ func main() { orchestration.Resume(step, ts) for _, parentRef := range step.Parents { parent := state.GetStep(parentRef) + // TODO: What about parents of the parents? orchestration.Resume(parent, ts) } return err diff --git a/cmd/testworkflow-init/orchestration/executions.go b/cmd/testworkflow-init/orchestration/executions.go index 98277c4e534..9cc2cb0e7b1 100644 --- a/cmd/testworkflow-init/orchestration/executions.go +++ b/cmd/testworkflow-init/orchestration/executions.go @@ -133,6 +133,7 @@ func (e *executionGroup) Kill() (err error) { func (e *executionGroup) Abort() { e.aborted.Store(true) _ = e.Kill() + _ = e.Resume() } func (e *executionGroup) IsAborted() bool { diff --git a/pkg/api/v1/testkube/model_test_workflow_result_extended.go b/pkg/api/v1/testkube/model_test_workflow_result_extended.go index 5b8f22dc227..4909d47a1c3 100644 --- a/pkg/api/v1/testkube/model_test_workflow_result_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_result_extended.go @@ -8,7 +8,7 @@ import ( ) func (r *TestWorkflowResult) IsFinished() bool { - return !r.IsStatus(QUEUED_TestWorkflowStatus) && !r.IsStatus(RUNNING_TestWorkflowStatus) && !r.IsStatus(PAUSED_TestWorkflowStatus) + return !r.FinishedAt.IsZero() && !r.IsStatus(QUEUED_TestWorkflowStatus) && !r.IsStatus(RUNNING_TestWorkflowStatus) && !r.IsStatus(PAUSED_TestWorkflowStatus) } func (r *TestWorkflowResult) IsStatus(s TestWorkflowStatus) bool { @@ -90,7 +90,7 @@ func (r *TestWorkflowResult) Fatal(err error, aborted bool, ts time.Time) { if r.FinishedAt.IsZero() { r.FinishedAt = ts.UTC() } - if r.Initialization.Status == nil || (*r.Initialization.Status == QUEUED_TestWorkflowStepStatus) || (*r.Initialization.Status == RUNNING_TestWorkflowStepStatus) { + if r.Initialization.Status == nil || !(*r.Initialization.Status).Finished() { r.Initialization.Status = common.Ptr(FAILED_TestWorkflowStepStatus) if aborted { r.Initialization.Status = common.Ptr(ABORTED_TestWorkflowStepStatus) @@ -158,16 +158,26 @@ func (r *TestWorkflowResult) RecomputeDuration() { if !r.FinishedAt.IsZero() { r.PausedMs = 0 + // Finalize pauses + for i := range r.Pauses { + step := r.Steps[r.Pauses[i].Ref] + if !step.FinishedAt.IsZero() { + if r.Pauses[i].ResumedAt.IsZero() { + r.Pauses[i].ResumedAt = step.FinishedAt + } + if r.Pauses[i].PausedAt.Before(step.StartedAt) { + r.Pauses[i].PausedAt = step.StartedAt + } + if r.Pauses[i].ResumedAt.Before(r.Pauses[i].PausedAt) { + r.Pauses[i].PausedAt = r.Pauses[i].ResumedAt + } + } + } + // Get unique pause periods pauses := make([]TestWorkflowPause, 0) loop: for _, p := range r.Pauses { - // Finalize the pause if it's not - step := r.Steps[p.Ref] - if !step.FinishedAt.IsZero() && p.ResumedAt.IsZero() { - p.ResumedAt = step.FinishedAt - } - for i := range pauses { // They don't overlap if p.PausedAt.After(pauses[i].ResumedAt) || p.ResumedAt.Before(pauses[i].PausedAt) { @@ -326,6 +336,14 @@ func (r *TestWorkflowResult) Recompute(sig []TestWorkflowSignature, scheduledAt r.Steps[s.ref] = s.result } + // Ensure initialization timestamps + if !r.Initialization.FinishedAt.IsZero() && r.Initialization.StartedAt.IsZero() { + r.Initialization.StartedAt = r.Initialization.FinishedAt + } + if !r.Initialization.StartedAt.IsZero() && r.Initialization.QueuedAt.IsZero() { + r.Initialization.QueuedAt = r.Initialization.StartedAt + } + // Calibrate the clock for group steps walkSteps(sig, func(s TestWorkflowSignature) { if len(s.Children) == 0 { @@ -386,8 +404,12 @@ func (r *TestWorkflowResult) Recompute(sig []TestWorkflowSignature, scheduledAt r.Status = common.Ptr(RUNNING_TestWorkflowStatus) } - if r.FinishedAt.IsZero() && r.Status != nil && *r.Status == ABORTED_TestWorkflowStatus { - r.FinishedAt = r.LatestTimestamp() + // Ensure the finish time is after all other timestamps + if !r.FinishedAt.IsZero() || (r.Status != nil && *r.Status == ABORTED_TestWorkflowStatus) { + lastTs := r.LatestTimestamp() + if r.FinishedAt.Before(lastTs) { + r.FinishedAt = lastTs + } } // Compute the duration @@ -543,7 +565,16 @@ func recomputeTestWorkflowStepResult(v TestWorkflowStepResult, sig TestWorkflowS // Ensure there is a start time if the step is already finished if v.StartedAt.IsZero() && !v.FinishedAt.IsZero() { - v.StartedAt = v.QueuedAt + if v.QueuedAt.IsZero() { + v.StartedAt = v.FinishedAt + } else { + v.StartedAt = v.QueuedAt + } + } + + // Ensure there is a queued time if the step is already finished + if v.QueuedAt.IsZero() && !v.StartedAt.IsZero() { + v.QueuedAt = v.StartedAt } // Compute children diff --git a/pkg/testworkflows/testworkflowcontroller/controller.go b/pkg/testworkflows/testworkflowcontroller/controller.go index 93cbf05de4e..843c959ac6c 100644 --- a/pkg/testworkflows/testworkflowcontroller/controller.go +++ b/pkg/testworkflows/testworkflowcontroller/controller.go @@ -231,6 +231,7 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei prevNodeName := "" prevPodIP := "" prevStatus := testkube.QUEUED_TestWorkflowStatus + prevIsFinished := false sig := stage.MapSignatureListToInternal(c.signature) ch := make(chan LightweightNotification) go func() { @@ -245,6 +246,7 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei podIP, _ := c.PodIP(parentCtx) current := prevCurrent status := prevStatus + isFinished := prevIsFinished if v.Value.Result != nil { if v.Value.Result.Status != nil { status = *v.Value.Result.Status @@ -252,13 +254,17 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei status = testkube.QUEUED_TestWorkflowStatus } current = v.Value.Result.Current(sig) + isFinished = v.Value.Result.IsFinished() } - if nodeName != prevNodeName || podIP != prevPodIP || prevStatus != status || prevCurrent != current { + // TODO: the final status should always have the finishedAt too, + // there should be no need for checking isFinished diff + if nodeName != prevNodeName || isFinished != prevIsFinished || podIP != prevPodIP || prevStatus != status || prevCurrent != current { prevNodeName = nodeName prevPodIP = podIP prevStatus = status prevCurrent = current + prevIsFinished = isFinished ch <- LightweightNotification{NodeName: nodeName, PodIP: podIP, Status: status, Current: current, Result: v.Value.Result} } } @@ -271,16 +277,6 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader { go func() { defer writer.Close() ref := "" - // Wait until there will be events fetched first - alignTimeoutCh := time.After(alignmentTimeout) - select { - case <-c.jobEvents.Peek(parentCtx): - case <-alignTimeoutCh: - } - select { - case <-c.podEvents.Peek(parentCtx): - case <-alignTimeoutCh: - } ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{ JobEvents: c.jobEvents, Job: c.job, diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go index ea04f770fe1..bcd303924a9 100644 --- a/pkg/testworkflows/testworkflowcontroller/logs.go +++ b/pkg/testworkflows/testworkflowcontroller/logs.go @@ -40,6 +40,23 @@ type ContainerLog struct { Output *instructions.Instruction } +type ContainerLogType string + +const ( + ContainerLogTypeHint ContainerLogType = "hint" + ContainerLogTypeOutput ContainerLogType = "output" + ContainerLogTypeLog ContainerLogType = "" +) + +func (c *ContainerLog) Type() ContainerLogType { + if c.Hint != nil { + return ContainerLogTypeHint + } else if c.Output != nil { + return ContainerLogTypeOutput + } + return ContainerLogTypeLog +} + // getContainerLogsStream is getting logs stream, and tries to reinitialize the stream on EOF. // EOF may happen not only on the actual container end, but also in case of the log rotation. // @see {@link https://stackoverflow.com/a/68673451} diff --git a/pkg/testworkflows/testworkflowcontroller/notifier.go b/pkg/testworkflows/testworkflowcontroller/notifier.go index 83e2985aeb1..567355efbdb 100644 --- a/pkg/testworkflows/testworkflowcontroller/notifier.go +++ b/pkg/testworkflows/testworkflowcontroller/notifier.go @@ -292,11 +292,11 @@ func (n *notifier) Output(ref string, ts time.Time, output *instructions.Instruc } func (n *notifier) Finish(ts time.Time) { - n.resultMu.Lock() - defer n.resultMu.Unlock() - if !n.result.FinishedAt.Before(ts) { + if ts.IsZero() { return } + n.resultMu.Lock() + defer n.resultMu.Unlock() n.result.FinishedAt = ts n.emit() } @@ -312,7 +312,7 @@ func (n *notifier) UpdateStepStatus(ref string, status testkube.TestWorkflowStep } func (n *notifier) finishInit(status ContainerResultStep) { - if n.result.Initialization.FinishedAt.Equal(status.FinishedAt) && n.result.Initialization.Status != nil && *n.result.Initialization.Status == status.Status { + if n.result.Initialization.FinishedAt.Equal(status.FinishedAt) && n.result.Initialization.Status != nil && *n.result.Initialization.Status == status.Status && (status.Status != testkube.ABORTED_TestWorkflowStepStatus || n.result.Initialization.ErrorMessage == status.Details) { return } n.result.Initialization.FinishedAt = status.FinishedAt.UTC() @@ -352,7 +352,7 @@ func (n *notifier) FinishStep(ref string, status ContainerResultStep) { n.finishInit(status) return } - if n.result.Steps[ref].FinishedAt.Equal(status.FinishedAt) && n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == status.Status { + if n.result.Steps[ref].FinishedAt.Equal(status.FinishedAt) && n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == status.Status && (status.Status != testkube.ABORTED_TestWorkflowStepStatus || n.result.Steps[ref].ErrorMessage == status.Details) { return } s := n.result.Steps[ref] diff --git a/pkg/testworkflows/testworkflowcontroller/podstate.go b/pkg/testworkflows/testworkflowcontroller/podstate.go index ee1a267a5b0..cd925c8a7ce 100644 --- a/pkg/testworkflows/testworkflowcontroller/podstate.go +++ b/pkg/testworkflows/testworkflowcontroller/podstate.go @@ -2,6 +2,7 @@ package testworkflowcontroller import ( "context" + "fmt" "regexp" "slices" "strconv" @@ -85,6 +86,9 @@ func newPodState(parentCtx context.Context) *podState { func (p *podState) preStartWatcher(name string) *channel[podStateUpdate] { if _, ok := p.prestart[name]; !ok { p.prestart[name] = newChannel[podStateUpdate](p.ctx, eventBufferSize) + if p.ctx.Err() != nil || p.unsafeIsStarted(name) || p.unsafeIsFinished(name) { + p.prestart[name].Close() + } } return p.prestart[name] } @@ -278,6 +282,13 @@ func (p *podState) RegisterJob(job *batchv1.Job) { p.setQueuedAt("", job.CreationTimestamp.Time) if job.Status.CompletionTime != nil { p.setFinishedAt("", job.Status.CompletionTime.Time) + } else if slices.ContainsFunc(job.Status.Conditions, isJobConditionEnd) { + for i := range job.Status.Conditions { + if isJobConditionEnd(job.Status.Conditions[i]) { + p.setFinishedAt("", job.Status.Conditions[i].LastTransitionTime.Time) + break + } + } } else if job.DeletionTimestamp != nil { p.setFinishedAt("", job.DeletionTimestamp.Time) } @@ -293,11 +304,18 @@ func (p *podState) PreStart(name string) <-chan ChannelMessage[podStateUpdate] { return p.preStartWatcher(name).Channel() } +func (p *podState) unsafeIsStarted(name string) bool { + return !p.started[name].IsZero() +} + +func (p *podState) unsafeIsFinished(name string) bool { + return !p.finished[name].IsZero() +} + func (p *podState) IsFinished(name string) bool { p.mu.Lock() defer p.mu.Unlock() - _, ok := p.finished[name] - return ok && p.ctx.Err() == nil + return p.unsafeIsFinished(name) } func (p *podState) Finished(name string) chan struct{} { @@ -331,9 +349,21 @@ func (p *podState) FinishedAt(name string) time.Time { func (p *podState) containerResult(name string) (ContainerResult, error) { status := p.containerStatus(name) if status == nil || status.State.Terminated == nil { - // TODO: Handle it nicer if p.pod != nil && IsPodDone(p.pod) { - return UnknownContainerResult, nil + result := UnknownContainerResult + for _, c := range p.pod.Status.Conditions { + if c.Type == corev1.DisruptionTarget && c.Status == corev1.ConditionTrue { + if c.Reason == "EvictionByEvictionAPI" { + result.Details = "Pod has been requested for deletion using the Kubernetes API" + } else if c.Message == "" { + result.Details = c.Reason + } else { + result.Details = fmt.Sprintf("%s: %s", c.Reason, c.Message) + } + break + } + } + return result, nil } return UnknownContainerResult, ErrNotTerminatedYet } @@ -381,58 +411,97 @@ func (p *podState) ContainerResult(name string) (ContainerResult, error) { func initializePodState(parentCtx context.Context, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], job Channel[*batchv1.Job], jobEvents Channel[*corev1.Event], errorHandler func(error)) *podState { ctx, ctxCancel := context.WithCancel(parentCtx) state := newPodState(ctx) + + // Fill optional channels + if job == nil { + job = newChannel[*batchv1.Job](ctx, 0) + job.Close() + } + if jobEvents == nil { + jobEvents = newChannel[*corev1.Event](ctx, 0) + jobEvents.Close() + } + go func() { defer ctxCancel() - var wg sync.WaitGroup - if job != nil { - wg.Add(1) - go func() { - for v := range job.Channel() { - if v.Error != nil { - errorHandler(v.Error) - } else { - state.RegisterJob(v.Value) - } + + // Build channels for the streams + left := 4 + jobCh := job.Channel() + jobEventsCh := jobEvents.Channel() + podCh := pod.Channel() + podEventsCh := podEvents.Channel() + + // Loop for the data + for { + if left == 0 { + return + } + + // Prioritize pod & events + select { + case <-parentCtx.Done(): + return + case v, ok := <-podCh: + if !ok { + podCh = nil + left-- + continue } - wg.Done() - }() - } - if jobEvents != nil { - wg.Add(1) - go func() { - for v := range jobEvents.Channel() { - if v.Error != nil { - errorHandler(v.Error) - } else { - state.RegisterEvent(v.Value) - } + if v.Error != nil { + errorHandler(v.Error) + continue + } + state.RegisterPod(v.Value) + case v, ok := <-jobEventsCh: + if !ok { + jobEventsCh = nil + left-- + continue } - wg.Done() - }() - } - wg.Add(1) - go func() { - for v := range podEvents.Channel() { if v.Error != nil { errorHandler(v.Error) - } else { - state.RegisterEvent(v.Value) + continue + } + state.RegisterEvent(v.Value) + case v, ok := <-podEventsCh: + if !ok { + podEventsCh = nil + left-- + continue + } + if v.Error != nil { + errorHandler(v.Error) + continue + } + state.RegisterEvent(v.Value) + case v, ok := <-jobCh: + if !ok { + jobCh = nil + left-- + continue } - } - wg.Done() - }() - wg.Add(1) - go func() { - for v := range pod.Channel() { if v.Error != nil { errorHandler(v.Error) - } else { - state.RegisterPod(v.Value) + continue } + + // Try to firstly finish with the Pod information when it's possible + if IsJobDone(v.Value) && state.FinishedAt("").IsZero() && HadPodScheduled(v.Value) { + select { + case p, ok := <-podCh: + if p.Error != nil { + errorHandler(p.Error) + } else if ok { + state.RegisterPod(p.Value) + } + case <-time.After(alignmentTimeout): + // Continue - likely we won't receive Pod status + } + } + state.RegisterJob(v.Value) } - wg.Done() - }() - wg.Wait() + } }() return state } diff --git a/pkg/testworkflows/testworkflowcontroller/utils.go b/pkg/testworkflows/testworkflowcontroller/utils.go index bfe7fd2a6ee..064bd6147eb 100644 --- a/pkg/testworkflows/testworkflowcontroller/utils.go +++ b/pkg/testworkflows/testworkflowcontroller/utils.go @@ -2,6 +2,7 @@ package testworkflowcontroller import ( "regexp" + "slices" "time" batchv1 "k8s.io/api/batch/v1" @@ -29,8 +30,16 @@ func IsPodDone(pod *corev1.Pod) bool { return (pod.Status.Phase != corev1.PodPending && pod.Status.Phase != corev1.PodRunning) || pod.ObjectMeta.DeletionTimestamp != nil } +func isJobConditionEnd(condition batchv1.JobCondition) bool { + return (condition.Type == batchv1.JobFailed || condition.Type == batchv1.JobComplete) && condition.Status == corev1.ConditionTrue && !condition.LastTransitionTime.IsZero() +} + func IsJobDone(job *batchv1.Job) bool { - return (job.Status.Active == 0 && (job.Status.Succeeded > 0 || job.Status.Failed > 0)) || job.ObjectMeta.DeletionTimestamp != nil + return (job.Status.Active == 0 && (job.Status.Succeeded > 0 || job.Status.Failed > 0)) || job.ObjectMeta.DeletionTimestamp != nil || job.Status.CompletionTime != nil || slices.ContainsFunc(job.Status.Conditions, isJobConditionEnd) +} + +func HadPodScheduled(job *batchv1.Job) bool { + return job.Status.Active > 0 || job.Status.Succeeded > 0 || job.Status.Failed > 0 } type ContainerResultStep struct { diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index 1485d593e7b..f0a2ec88e97 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/gookit/color" "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -21,8 +22,9 @@ import ( ) const ( - InitContainerName = "tktw-init" - IdleTimeout = 100 * time.Millisecond + InitContainerName = "tktw-init" + IdleTimeout = 100 * time.Millisecond + ExpectedDelayTimeout = 1 * time.Second ) type WatchInstrumentedPodOptions struct { @@ -65,13 +67,27 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Ensure the queue/start time has been saved - if s.result.QueuedAt.IsZero() || s.result.StartedAt.IsZero() { + if (s.result.QueuedAt.IsZero() || s.result.StartedAt.IsZero()) && state.FinishedAt("").IsZero() { s.Error(errors.New("missing information about scheduled pod")) return } // Load the namespace information - podObj := <-pod.Peek(ctx) + var podObj *corev1.Pod + select { + // Obtain the Pod information for further execution + case p := <-pod.Peek(ctx): + podObj = p + // Handle when the execution have been finished, but there may be no Pod + case <-state.Finished(""): + select { + case <-time.After(ExpectedDelayTimeout): + s.Error(fmt.Errorf("the execution is finished, but failed to get pod")) + return + case p := <-pod.Peek(ctx): + podObj = p + } + } // Load the references var refs, endRefs [][]string @@ -113,8 +129,30 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Update queue time s.Queue(initialRef, lastTs) - // Watch the container events - for v := range state.PreStart(containerName) { + // Watch the container events, along with final finish too + preStartCh := state.PreStart(containerName) + finishedCh := state.Finished("") + loop: + for { + var v ChannelMessage[podStateUpdate] + select { + case vv, ok := <-preStartCh: + if !ok { + break loop + } + v = vv + default: + select { + case vv, ok := <-preStartCh: + if !ok { + break loop + } + v = vv + case <-finishedCh: + break loop + } + } + if v.Value.Queued != nil { s.Queue(initialRef, state.QueuedAt(containerName)) } else if v.Value.Started != nil { @@ -127,22 +165,27 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Ensure the queue/start time has been saved - if s.GetStepResult(initialRef).QueuedAt.IsZero() || s.GetStepResult(initialRef).StartedAt.IsZero() { + if (s.GetStepResult(initialRef).QueuedAt.IsZero() || s.GetStepResult(initialRef).StartedAt.IsZero()) && state.FinishedAt("").IsZero() { s.Error(fmt.Errorf("missing information about scheduled '%s' step in '%s' container", initialRef, container.Name)) return } // Watch the container logs - follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(containerName) + follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(containerName) && !state.IsFinished("") aborted := false lastStarted := initialRef executionStatuses := map[string]constants.ExecutionResult{} for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, containerName, follow, 10, pod).Channel() { if v.Error != nil { s.Error(v.Error) - } else if v.Value.Output != nil { + } + + switch v.Value.Type() { + case ContainerLogTypeLog: + s.Raw(lastStarted, v.Value.Time, string(v.Value.Log), false) + case ContainerLogTypeOutput: s.Output(v.Value.Output.Ref, v.Value.Time, v.Value.Output) - } else if v.Value.Hint != nil { + case ContainerLogTypeHint: if v.Value.Hint.Ref == constants2.RootOperationName { continue } @@ -189,18 +232,22 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } s.Resume(v.Value.Hint.Ref, end) } - } else { - s.Raw(lastStarted, v.Value.Time, string(v.Value.Log), false) } } if aborted { // Don't wait for any other statuses if we already know that some task has been aborted } else if follow { - <-state.Finished(container.Name) + select { + case <-state.Finished(container.Name): + case <-state.Finished(""): + // Finish fast when the whole execution has been finished + } } else { select { case <-state.Finished(container.Name): + case <-state.Finished(""): + // Finish fast when the whole execution has been finished case <-time.After(IdleTimeout): return } @@ -222,48 +269,79 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf status := ContainerResultStep{ Status: testkube.ABORTED_TestWorkflowStepStatus, ExitCode: -1, - Details: "", - FinishedAt: result.FinishedAt, + Details: result.Details, + FinishedAt: s.GetStepResult(ref).FinishedAt, + } + if status.FinishedAt.IsZero() { + status.FinishedAt = result.FinishedAt } + if status.FinishedAt.IsZero() { + status.FinishedAt = state.FinishedAt("") + } + if status.FinishedAt.IsZero() { + status.FinishedAt = s.GetLastTimestamp(lastStarted) + } + if len(result.Steps) > i { status = result.Steps[i] + if status.Details == "" { + status.Details = result.Details + } + finishedAt := s.GetStepResult(ref).FinishedAt + if !finishedAt.IsZero() { + status.FinishedAt = finishedAt + } } - if !s.IsFinished(ref) { - s.FinishStep(ref, status) + s.FinishStep(ref, status) + if status.Status == testkube.ABORTED_TestWorkflowStepStatus { + lastStarted = ref + break } } } // Update the last timestamp - lastTs = s.GetLastTimestamp(lastStarted) + nextLastTs := s.GetLastTimestamp(lastStarted) + if nextLastTs.After(lastTs) { + lastTs = nextLastTs + } // Break the function if the step has been aborted. // Breaking only to the loop is not enough, // because due to GKE bug, the Job is still pending, // so it will get stuck there. if s.IsAnyAborted() { + result, _ := state.ContainerResult(container.Name) reason := s.result.Steps[lastStarted].ErrorMessage + if reason == "" { + reason = result.Details + } message := "Aborted" if reason == "" { - message = fmt.Sprintf("\n%s Aborted", s.GetLastTimestamp(lastStarted).Format(KubernetesLogTimeFormat)) + message = fmt.Sprintf("\n%s Aborted", lastTs.Format(KubernetesLogTimeFormat)) } else { - message = fmt.Sprintf("\n%s Aborted (%s)", s.GetLastTimestamp(lastStarted).Format(KubernetesLogTimeFormat), reason) + message = fmt.Sprintf("\n%s Aborted (%s)", lastTs.Format(KubernetesLogTimeFormat), reason) } - s.Raw(lastStarted, s.GetLastTimestamp(lastStarted), message, false) + s.Raw(lastStarted, lastTs, message, false) // Mark all not started steps as skipped for ref := range s.result.Steps { if !s.IsFinished(ref) { status := testkube.SKIPPED_TestWorkflowStepStatus - details := "The execution was aborted before." + details := "The execution was aborted before" if s.result.Steps[ref].Status != nil && *s.result.Steps[ref].Status != testkube.QUEUED_TestWorkflowStepStatus { status = testkube.ABORTED_TestWorkflowStepStatus - details = "" + details = result.Details + } else if result.Details != "" { + details = fmt.Sprintf("The execution was aborted before %s", color.FgDarkGray.Render("("+result.Details+")")) + } + if details != "" { + s.Raw(ref, lastTs, fmt.Sprintf("%s %s", lastTs.Format(KubernetesLogTimeFormat), details), false) } s.FinishStep(ref, ContainerResultStep{ Status: status, ExitCode: -1, - Details: details, + Details: "", FinishedAt: lastTs, }) } diff --git a/pkg/testworkflows/testworkflowprocessor/processor.go b/pkg/testworkflows/testworkflowprocessor/processor.go index f941003205f..66c863327ee 100644 --- a/pkg/testworkflows/testworkflowprocessor/processor.go +++ b/pkg/testworkflows/testworkflowprocessor/processor.go @@ -361,6 +361,7 @@ func (p *processor) Bundle(ctx context.Context, workflow *testworkflowsv1.TestWo podSpec.Spec.Containers = containers[len(containers)-1:] // Build job spec + // TODO: Add ownerReferences in case of parent pod? jobSpec := batchv1.Job{ TypeMeta: metav1.TypeMeta{ Kind: "Job",