Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid retrieving container logs again, when expected information is complete #5868

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface,
return stream, nil
}

func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool) <-chan ChannelMessage[ContainerLog] {
func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, isDone func() bool, isLastHint func(*instructions.Instruction) bool) <-chan ChannelMessage[ContainerLog] {
ctx, ctxCancel := context.WithCancel(parentCtx)
ch := make(chan ChannelMessage[ContainerLog], bufferSize)
var mu sync.Mutex
Expand Down Expand Up @@ -253,6 +253,7 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
readerAnyContent := false
tsReader := newTimestampReader()
lastTs := time.Now()
completed := false

hasNewLine := false

Expand Down Expand Up @@ -286,8 +287,10 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac

// If the stream is finished,
// either the logfile has been rotated, or the container actually finished.
// Assume that only if there was EOF without any logs since, the container is done.
if err == io.EOF && !readerAnyContent {
// Consider the container is done only when either:
// - there was EOF without any logs since, or
// - the last expected instruction was already delivered
if err == io.EOF && (!readerAnyContent || completed) {
return
}

Expand Down Expand Up @@ -361,6 +364,9 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
item := ContainerLog{Time: lastTs}
if isHint {
item.Hint = instruction
if !completed && isLastHint(instruction) {
completed = true
}
} else {
item.Output = instruction
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller/watchers"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
Expand Down Expand Up @@ -96,7 +97,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
notifier.Error(fmt.Errorf("cannot read execution instructions: %v", err))
return
}
refs, _ := ExtractRefsFromActionGroup(actions)
refs, endRefs := ExtractRefsFromActionGroup(actions)
initialRefs := make([]string, len(actions))
for i := range refs {
for j := range refs[i] {
Expand All @@ -119,6 +120,12 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
aborted := false
container := fmt.Sprintf("%d", containerIndex+1)

// Determine the last ref in this container, so we can confirm that the logs have been read until end
lastRef := endRefs[containerIndex][len(endRefs[containerIndex])-1]
if lastRef == "" && len(endRefs[containerIndex]) > 1 {
lastRef = endRefs[containerIndex][len(endRefs[containerIndex])-2]
}

// Wait until the Container is started
currentPodEventsIndex = 0
for ok := true; ok; _, ok = <-updatesCh {
Expand Down Expand Up @@ -148,10 +155,13 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
lastStarted = refs[containerIndex][0]

// Read the Container logs
isLastHint := func(hint *instructions.Instruction) bool {
return hint != nil && hint.Ref == lastRef && hint.Name == constants.InstructionEnd
}
isDone := func() bool {
return opts.DisableFollow || watcher.State().ContainerFinished(container) || watcher.State().Completed()
}
for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone) {
for v := range WatchContainerLogs(ctx, clientSet, watcher.State().Namespace(), watcher.State().PodName(), container, 10, isDone, isLastHint) {
if v.Error != nil {
notifier.Error(v.Error)
continue
Expand Down
Loading