Skip to content

Commit

Permalink
limit task retry when system error
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Dec 13, 2021
1 parent dd78a2f commit 4d61ffd
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 2 deletions.
15 changes: 14 additions & 1 deletion apistructs/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

const (
// TerminusDefineTag add this tag env to container for collecting logs
TerminusDefineTag = "TERMINUS_DEFINE_TAG"
TerminusDefineTag = "TERMINUS_DEFINE_TAG"
PipelineTaskMaxRetryLimit = 100
PipelineTaskMaxRetryDuration = 24 * time.Hour
)

type PipelineTaskDTO struct {
Expand Down Expand Up @@ -267,6 +269,17 @@ func (t *PipelineTaskInspect) ConvertErrors() {
}
}

func (t *PipelineTaskInspect) IsErrorsExceed() bool {
now := time.Now()
for _, g := range t.Errors {
if (!g.Ctx.StartTime.IsZero() && g.Ctx.StartTime.Add(PipelineTaskMaxRetryDuration).Before(now)) ||
g.Ctx.Count >= PipelineTaskMaxRetryLimit {
return true
}
}
return false
}

func (l *PipelineTaskLoop) Duplicate() *PipelineTaskLoop {
if l == nil {
return nil
Expand Down
18 changes: 18 additions & 0 deletions apistructs/pipeline_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package apistructs
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPipelineTaskLoop_Duplicate(t *testing.T) {
Expand Down Expand Up @@ -95,3 +98,18 @@ func TestPipelineTaskLoop_IsEmpty(t *testing.T) {
})
}
}

func TestIsErrorsExceed(t *testing.T) {
now := time.Now()
timeExceedInspect := &PipelineTaskInspect{}
timeExceedInspect.Errors = timeExceedInspect.AppendError(&PipelineTaskErrResponse{Msg: "xxx", Ctx: PipelineTaskErrCtx{StartTime: now.Add(-25 * time.Hour)}})
assert.Equal(t, true, timeExceedInspect.IsErrorsExceed())

countExceedInspect := &PipelineTaskInspect{}
for i := 0; i < 99; i++ {
countExceedInspect.Errors = countExceedInspect.AppendError(&PipelineTaskErrResponse{Msg: "xxx"})
assert.Equal(t, false, countExceedInspect.IsErrorsExceed())
}
countExceedInspect.Errors = countExceedInspect.AppendError(&PipelineTaskErrResponse{Msg: "xxx"})
assert.Equal(t, true, countExceedInspect.IsErrorsExceed())
}
4 changes: 4 additions & 0 deletions modules/pipeline/pipengine/reconciler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func reconcileTask(tr *taskrun.TaskRun) error {
rlog.TErrorf(tr.P.ID, tr.Task.ID, "failed to handle taskOp: %s, user abnormalErr: %v, don't need retry", taskOp.Op(), abnormalErr)
return abnormalErr
}
if tr.Task.Inspect.IsErrorsExceed() {
rlog.TErrorf(tr.P.ID, tr.Task.ID, "failed to handle taskOp: %s, errors exceed limit, stop retry", taskOp.Op())
return abnormalErr
}
// don't contain user error mean err is platform error, should retry always
rlog.TErrorf(tr.P.ID, tr.Task.ID, "failed to handle taskOp: %s, abnormalErr: %v, continue retry, retry times: %d", taskOp.Op(), abnormalErr, platformErrRetryTimes)
resetTaskForAbnormalRetry(tr, platformErrRetryTimes)
Expand Down
2 changes: 1 addition & 1 deletion modules/pipeline/pipengine/reconciler/taskrun/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) {
}

// if result only contain platform error, task will retry, so don't set status changed
if result != nil && !errorsx.IsContainUserError(result) {
if result != nil && !errorsx.IsContainUserError(result) && !tr.Task.Inspect.IsErrorsExceed() {
tr.Task.Status = oldStatus
}

Expand Down

0 comments on commit 4d61ffd

Please sign in to comment.