Skip to content

Commit

Permalink
Fix: fix suspend judgement (#198)
Browse files Browse the repository at this point in the history
fix: fix suspend judgement

Signed-off-by: FogDong <fog@bentoml.com>
  • Loading branch information
FogDong authored Sep 24, 2024
1 parent 97c5853 commit fc01033
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 27 deletions.
4 changes: 2 additions & 2 deletions makefiles/dependency.mk
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ goimports:
ifeq (, $(shell which goimports))
@{ \
set -e ;\
go install golang.org/x/tools/cmd/goimports@latest ;\
go install golang.org/x/tools/cmd/goimports@v0.1.12 ;\
}
GOIMPORTS=$(GOBIN)/goimports
else
Expand Down Expand Up @@ -99,4 +99,4 @@ $(CONTROLLER_GEN): $(LOCALBIN)
.PHONY: envtest
envtest: $(ENVTEST) ## Download envtest-setup locally if necessary.
$(ENVTEST): $(LOCALBIN)
GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@v0.0.0-20230216140739-c98506dc3b8e
17 changes: 9 additions & 8 deletions pkg/executor/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,6 @@ func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunner
}
return v1alpha1.WorkflowStateFailed, nil
}
if checkWorkflowSuspended(status) {
return v1alpha1.WorkflowStateSuspending, nil
}
if allRunnersSucceeded {
return v1alpha1.WorkflowStateSucceeded, nil
}

wfCtx, err := w.makeContext(ctx, w.instance.Name)
if err != nil {
Expand All @@ -129,6 +123,13 @@ func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunner
}
w.wfCtx = wfCtx

if checkWorkflowSuspended(status) {
return v1alpha1.WorkflowStateSuspending, nil
}
if allRunnersSucceeded {
return v1alpha1.WorkflowStateSucceeded, nil
}

if cacheValue, ok := StepStatusCache.Load(cacheKey); ok {
// handle cache resource
if len(status.Steps) < cacheValue.(int) {
Expand Down Expand Up @@ -175,11 +176,11 @@ func checkWorkflowSuspended(status *v1alpha1.WorkflowRunStatus) bool {
// if workflow is suspended and the suspended step is still running, return false to run the suspended step
if status.Suspend {
for _, step := range status.Steps {
if step.Phase == v1alpha1.WorkflowStepPhaseSuspending {
if step.Reason == types.StatusReasonSuspend && step.Phase == v1alpha1.WorkflowStepPhaseSuspending {
return false
}
for _, sub := range step.SubStepsStatus {
if sub.Phase == v1alpha1.WorkflowStepPhaseSuspending {
if sub.Reason == types.StatusReasonSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseSuspending {
return false
}
}
Expand Down
36 changes: 20 additions & 16 deletions pkg/executor/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,10 +1772,11 @@ var _ = Describe("Test Workflow", func() {
},
}, {
StepStatus: v1alpha1.StepStatus{
Name: "s2",
ID: "s2",
Type: "suspend",
Phase: v1alpha1.WorkflowStepPhaseSuspending,
Name: "s2",
ID: "s2",
Type: "suspend",
Reason: types.StatusReasonSuspend,
Phase: v1alpha1.WorkflowStepPhaseSuspending,
},
}},
})).Should(BeEquivalentTo(""))
Expand Down Expand Up @@ -1805,10 +1806,11 @@ var _ = Describe("Test Workflow", func() {
},
}, {
StepStatus: v1alpha1.StepStatus{
Name: "s2",
ID: "s2",
Type: "suspend",
Phase: v1alpha1.WorkflowStepPhaseSucceeded,
Name: "s2",
ID: "s2",
Type: "suspend",
Reason: types.StatusReasonSuspend,
Phase: v1alpha1.WorkflowStepPhaseSucceeded,
},
}, {
StepStatus: v1alpha1.StepStatus{
Expand Down Expand Up @@ -1884,10 +1886,11 @@ var _ = Describe("Test Workflow", func() {
Type: "success",
Phase: v1alpha1.WorkflowStepPhaseSucceeded,
}, {
Name: "s2-sub2",
ID: "s2-sub2",
Type: "suspend",
Phase: v1alpha1.WorkflowStepPhaseSuspending,
Name: "s2-sub2",
ID: "s2-sub2",
Type: "suspend",
Reason: types.StatusReasonSuspend,
Phase: v1alpha1.WorkflowStepPhaseSuspending,
},
},
}},
Expand Down Expand Up @@ -2234,10 +2237,11 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
}
}
return v1alpha1.StepStatus{
Name: step.Name,
Type: "suspend",
ID: step.Name,
Phase: v1alpha1.WorkflowStepPhaseSuspending,
Name: step.Name,
Type: "suspend",
ID: step.Name,
Reason: types.StatusReasonSuspend,
Phase: v1alpha1.WorkflowStepPhaseSuspending,
}, &types.Operation{
Suspend: true,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/tasks/custom/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (exec *executor) Terminate(message string) {
// Wait let workflow wait.
func (exec *executor) Wait(message string) {
exec.wait = true
if exec.wfStatus.Phase != v1alpha1.WorkflowStepPhaseFailed {
if exec.wfStatus.Phase != v1alpha1.WorkflowStepPhaseFailed && exec.wfStatus.Phase != v1alpha1.WorkflowStepPhaseSuspending {
exec.wfStatus.Phase = v1alpha1.WorkflowStepPhaseRunning
exec.wfStatus.Reason = types.StatusReasonWait
if message != "" {
Expand Down

0 comments on commit fc01033

Please sign in to comment.