Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: controlling issues #5756

Merged
merged 16 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/tcl/testworkflow-toolkit/commands/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func NewParallelCmd() *cobra.Command {

prevStatus := testkube.QUEUED_TestWorkflowStatus
prevStep := ""
prevIsFinished := false
scheduled := false
for v := range ctrl.WatchLightweight(ctx) {
// Handle error
Expand All @@ -283,14 +284,17 @@ func NewParallelCmd() *cobra.Command {
}

// Handle result change
if v.Status != prevStatus || v.Current != prevStep {
// TODO: the final status should always have the finishedAt too,
// there should be no need for checking isFinished diff
if v.Status != prevStatus || lastResult.IsFinished() != prevIsFinished || v.Current != prevStep {
if v.Status != prevStatus {
log(string(v.Status))
}
updates <- Update{index: index, result: v.Result}
prevStep = v.Current
prevStatus = v.Status
if v.Result.IsFinished() {
prevIsFinished = lastResult.IsFinished()
if lastResult.IsFinished() {
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Result: v.Result})
ctxCancel()
return v.Result.IsPassed()
Expand Down
2 changes: 2 additions & 0 deletions cmd/testworkflow-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func main() {
orchestration.Pause(step, *step.StartedAt)
for _, parentRef := range step.Parents {
parent := state.GetStep(parentRef)
// TODO: What about parents of the parents?
orchestration.Pause(parent, *step.StartedAt)
}
return err
Expand All @@ -125,6 +126,7 @@ func main() {
orchestration.Resume(step, ts)
for _, parentRef := range step.Parents {
parent := state.GetStep(parentRef)
// TODO: What about parents of the parents?
orchestration.Resume(parent, ts)
}
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-init/orchestration/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (e *executionGroup) Kill() (err error) {
func (e *executionGroup) Abort() {
e.aborted.Store(true)
_ = e.Kill()
_ = e.Resume()
}

func (e *executionGroup) IsAborted() bool {
Expand Down
42 changes: 33 additions & 9 deletions pkg/api/v1/testkube/model_test_workflow_result_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func (r *TestWorkflowResult) IsFinished() bool {
return !r.IsStatus(QUEUED_TestWorkflowStatus) && !r.IsStatus(RUNNING_TestWorkflowStatus) && !r.IsStatus(PAUSED_TestWorkflowStatus)
return !r.FinishedAt.IsZero() && !r.IsStatus(QUEUED_TestWorkflowStatus) && !r.IsStatus(RUNNING_TestWorkflowStatus) && !r.IsStatus(PAUSED_TestWorkflowStatus)
}

func (r *TestWorkflowResult) IsStatus(s TestWorkflowStatus) bool {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (r *TestWorkflowResult) Fatal(err error, aborted bool, ts time.Time) {
if r.FinishedAt.IsZero() {
r.FinishedAt = ts.UTC()
}
if r.Initialization.Status == nil || (*r.Initialization.Status == QUEUED_TestWorkflowStepStatus) || (*r.Initialization.Status == RUNNING_TestWorkflowStepStatus) {
if r.Initialization.Status == nil || !(*r.Initialization.Status).Finished() {
r.Initialization.Status = common.Ptr(FAILED_TestWorkflowStepStatus)
if aborted {
r.Initialization.Status = common.Ptr(ABORTED_TestWorkflowStepStatus)
Expand Down Expand Up @@ -158,16 +158,23 @@ func (r *TestWorkflowResult) RecomputeDuration() {
if !r.FinishedAt.IsZero() {
r.PausedMs = 0

// Finalize pauses
for i := range r.Pauses {
step := r.Steps[r.Pauses[i].Ref]
if !step.FinishedAt.IsZero() {
if r.Pauses[i].ResumedAt.IsZero() {
r.Pauses[i].ResumedAt = step.FinishedAt
}
if r.Pauses[i].PausedAt.Before(step.StartedAt) {
r.Pauses[i].PausedAt = step.StartedAt
}
}
}

// Get unique pause periods
pauses := make([]TestWorkflowPause, 0)
loop:
for _, p := range r.Pauses {
// Finalize the pause if it's not
step := r.Steps[p.Ref]
if !step.FinishedAt.IsZero() && p.ResumedAt.IsZero() {
p.ResumedAt = step.FinishedAt
}

for i := range pauses {
// They don't overlap
if p.PausedAt.After(pauses[i].ResumedAt) || p.ResumedAt.Before(pauses[i].PausedAt) {
Expand Down Expand Up @@ -326,6 +333,14 @@ func (r *TestWorkflowResult) Recompute(sig []TestWorkflowSignature, scheduledAt
r.Steps[s.ref] = s.result
}

// Ensure initialization timestamps
if !r.Initialization.FinishedAt.IsZero() && r.Initialization.StartedAt.IsZero() {
r.Initialization.StartedAt = r.Initialization.FinishedAt
}
if !r.Initialization.StartedAt.IsZero() && r.Initialization.QueuedAt.IsZero() {
r.Initialization.QueuedAt = r.Initialization.StartedAt
}

// Calibrate the clock for group steps
walkSteps(sig, func(s TestWorkflowSignature) {
if len(s.Children) == 0 {
Expand Down Expand Up @@ -543,7 +558,16 @@ func recomputeTestWorkflowStepResult(v TestWorkflowStepResult, sig TestWorkflowS

// Ensure there is a start time if the step is already finished
if v.StartedAt.IsZero() && !v.FinishedAt.IsZero() {
v.StartedAt = v.QueuedAt
if v.QueuedAt.IsZero() {
v.StartedAt = v.FinishedAt
} else {
v.StartedAt = v.QueuedAt
}
}

// Ensure there is a queued time if the step is already finished
if v.QueuedAt.IsZero() && !v.StartedAt.IsZero() {
v.QueuedAt = v.StartedAt
}

// Compute children
Expand Down
18 changes: 7 additions & 11 deletions pkg/testworkflows/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
prevNodeName := ""
prevPodIP := ""
prevStatus := testkube.QUEUED_TestWorkflowStatus
prevIsFinished := false
sig := stage.MapSignatureListToInternal(c.signature)
ch := make(chan LightweightNotification)
go func() {
Expand All @@ -245,20 +246,25 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
podIP, _ := c.PodIP(parentCtx)
current := prevCurrent
status := prevStatus
isFinished := prevIsFinished
if v.Value.Result != nil {
if v.Value.Result.Status != nil {
status = *v.Value.Result.Status
} else {
status = testkube.QUEUED_TestWorkflowStatus
}
current = v.Value.Result.Current(sig)
isFinished = v.Value.Result.IsFinished()
}

if nodeName != prevNodeName || podIP != prevPodIP || prevStatus != status || prevCurrent != current {
// TODO: the final status should always have the finishedAt too,
// there should be no need for checking isFinished diff
if nodeName != prevNodeName || isFinished != prevIsFinished || podIP != prevPodIP || prevStatus != status || prevCurrent != current {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

such code will not be simple, to maintain (

Copy link
Member Author

@rangoo94 rangoo94 Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully agree, although this code is rather not meant to be maintained. This PR, along actual bugfixes, is meant to "enable" the new orchestration with similar watching system.

The (mainly) WatchInstrumentedPod and TestWorkflowResult have so many edge cases handled, clock calibration, and auto-healing mechanisms implemented, that at this point it's probably better to just rewrite them based on the observations that we have for the last few months of running Test Workflows. After that, we will likely not need conditions like these at all.

I'm guessing that half of the healing mechanisms and edge cases handlers are no longer needed, considering iterations of orchestration improvements.

prevNodeName = nodeName
prevPodIP = podIP
prevStatus = status
prevCurrent = current
prevIsFinished = isFinished
ch <- LightweightNotification{NodeName: nodeName, PodIP: podIP, Status: status, Current: current, Result: v.Value.Result}
}
}
Expand All @@ -271,16 +277,6 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
go func() {
defer writer.Close()
ref := ""
// Wait until there will be events fetched first
alignTimeoutCh := time.After(alignmentTimeout)
select {
case <-c.jobEvents.Peek(parentCtx):
case <-alignTimeoutCh:
}
select {
case <-c.podEvents.Peek(parentCtx):
case <-alignTimeoutCh:
}
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
Expand Down
17 changes: 17 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ type ContainerLog struct {
Output *instructions.Instruction
}

type ContainerLogType string

const (
ContainerLogTypeHint ContainerLogType = "hint"
ContainerLogTypeOutput ContainerLogType = "output"
ContainerLogTypeLog ContainerLogType = ""
)

func (c *ContainerLog) Type() ContainerLogType {
if c.Hint != nil {
return ContainerLogTypeHint
} else if c.Output != nil {
return ContainerLogTypeOutput
}
return ContainerLogTypeLog
}

// getContainerLogsStream is getting logs stream, and tries to reinitialize the stream on EOF.
// EOF may happen not only on the actual container end, but also in case of the log rotation.
// @see {@link https://stackoverflow.com/a/68673451}
Expand Down
4 changes: 2 additions & 2 deletions pkg/testworkflows/testworkflowcontroller/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (n *notifier) UpdateStepStatus(ref string, status testkube.TestWorkflowStep
}

func (n *notifier) finishInit(status ContainerResultStep) {
if n.result.Initialization.FinishedAt.Equal(status.FinishedAt) && n.result.Initialization.Status != nil && *n.result.Initialization.Status == status.Status {
if n.result.Initialization.FinishedAt.Equal(status.FinishedAt) && n.result.Initialization.Status != nil && *n.result.Initialization.Status == status.Status && (status.Status != testkube.ABORTED_TestWorkflowStepStatus || n.result.Initialization.ErrorMessage == status.Details) {
return
}
n.result.Initialization.FinishedAt = status.FinishedAt.UTC()
Expand Down Expand Up @@ -352,7 +352,7 @@ func (n *notifier) FinishStep(ref string, status ContainerResultStep) {
n.finishInit(status)
return
}
if n.result.Steps[ref].FinishedAt.Equal(status.FinishedAt) && n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == status.Status {
if n.result.Steps[ref].FinishedAt.Equal(status.FinishedAt) && n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == status.Status && (status.Status != testkube.ABORTED_TestWorkflowStepStatus || n.result.Steps[ref].ErrorMessage == status.Details) {
return
}
s := n.result.Steps[ref]
Expand Down
Loading
Loading