diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go index d94caf8ca7..d98c40b808 100644 --- a/pkg/testworkflows/testworkflowcontroller/logs.go +++ b/pkg/testworkflows/testworkflowcontroller/logs.go @@ -107,7 +107,7 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface, return stream, nil } -func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool) <-chan ChannelMessage[ContainerLog] { +func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool, isLastHint func(*instructions.Instruction) bool) <-chan ChannelMessage[ContainerLog] { ctx, ctxCancel := context.WithCancel(parentCtx) ch := make(chan ChannelMessage[ContainerLog], bufferSize) var mu sync.Mutex @@ -253,6 +253,7 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac readerAnyContent := false tsReader := newTimestampReader() lastTs := time.Now() + completed := false hasNewLine := false @@ -286,8 +287,10 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac // If the stream is finished, // either the logfile has been rotated, or the container actually finished. - // Assume that only if there was EOF without any logs since, the container is done. - if err == io.EOF && !readerAnyContent { + // Consider the container is done only when either: + // - there was EOF without any logs since, or + // - the last expected instruction was already delivered + if err == io.EOF && (!readerAnyContent || completed) { return } @@ -361,6 +364,9 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac item := ContainerLog{Time: lastTs} if isHint { item.Hint = instruction + if !completed && isLastHint(instruction) { + completed = true + } } else { item.Output = instruction } diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index 315f5bdbce..bd8bf5749b 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -10,6 +10,7 @@ import ( "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/testworkflow-init/instructions" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller/watchers" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage" @@ -96,7 +97,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf notifier.Error(fmt.Errorf("cannot read execution instructions: %v", err)) return } - refs, _ := ExtractRefsFromActionGroup(actions) + refs, endRefs := ExtractRefsFromActionGroup(actions) initialRefs := make([]string, len(actions)) for i := range refs { for j := range refs[i] { @@ -119,6 +120,12 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf aborted := false container := fmt.Sprintf("%d", containerIndex+1) + // Determine the last ref in this container, so we can confirm that the logs have been read until end + lastRef := endRefs[containerIndex][len(endRefs[containerIndex])-1] + if lastRef == "" && len(endRefs[containerIndex]) > 1 { + lastRef = endRefs[containerIndex][len(endRefs[containerIndex])-2] + } + // Wait until the Container is started currentPodEventsIndex = 0 for ok := true; ok; _, ok = <-updatesCh { @@ -148,10 +155,13 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf lastStarted = refs[containerIndex][0] // Read the Container logs + isLastHint := func(hint *instructions.Instruction) bool { + return hint != nil && hint.Ref == lastRef && hint.Name == constants.InstructionEnd + } isDone := func() bool { return opts.DisableFollow || watcher.State().ContainerFinished(container) || watcher.State().Completed() } - for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone) { + for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone, isLastHint) { if v.Error != nil { notifier.Error(v.Error) continue