diff --git a/gen/go/v1/config.pb.go b/gen/go/v1/config.pb.go index 638a176d..38235a42 100644 --- a/gen/go/v1/config.pb.go +++ b/gen/go/v1/config.pb.go @@ -206,11 +206,12 @@ func (Hook_Condition) EnumDescriptor() ([]byte, []int) { type Hook_OnError int32 const ( - Hook_ON_ERROR_IGNORE Hook_OnError = 0 - Hook_ON_ERROR_CANCEL Hook_OnError = 1 // cancels the operation and skips subsequent hooks - Hook_ON_ERROR_FATAL Hook_OnError = 2 // fails the operation and subsequent hooks - Hook_ON_ERROR_RETRY_10MINUTES Hook_OnError = 101 // retry the operation every 10 minutes - Hook_ON_ERROR_RETRY_1HOUR Hook_OnError = 102 // retry the operation every hour + Hook_ON_ERROR_IGNORE Hook_OnError = 0 + Hook_ON_ERROR_CANCEL Hook_OnError = 1 // cancels the operation and skips subsequent hooks + Hook_ON_ERROR_FATAL Hook_OnError = 2 // fails the operation and subsequent hooks. + Hook_ON_ERROR_RETRY_1MINUTE Hook_OnError = 100 // retry the operation every minute + Hook_ON_ERROR_RETRY_10MINUTES Hook_OnError = 101 // retry the operation every 10 minutes + Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF Hook_OnError = 103 // retry the operation with exponential backoff up to 1h max. ) // Enum value maps for Hook_OnError. @@ -219,15 +220,17 @@ var ( 0: "ON_ERROR_IGNORE", 1: "ON_ERROR_CANCEL", 2: "ON_ERROR_FATAL", + 100: "ON_ERROR_RETRY_1MINUTE", 101: "ON_ERROR_RETRY_10MINUTES", - 102: "ON_ERROR_RETRY_1HOUR", + 103: "ON_ERROR_RETRY_EXPONENTIAL_BACKOFF", } Hook_OnError_value = map[string]int32{ - "ON_ERROR_IGNORE": 0, - "ON_ERROR_CANCEL": 1, - "ON_ERROR_FATAL": 2, - "ON_ERROR_RETRY_10MINUTES": 101, - "ON_ERROR_RETRY_1HOUR": 102, + "ON_ERROR_IGNORE": 0, + "ON_ERROR_CANCEL": 1, + "ON_ERROR_FATAL": 2, + "ON_ERROR_RETRY_1MINUTE": 100, + "ON_ERROR_RETRY_10MINUTES": 101, + "ON_ERROR_RETRY_EXPONENTIAL_BACKOFF": 103, } ) @@ -2120,7 +2123,7 @@ var file_v1_config_proto_rawDesc = []byte{ 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x48, 0x6f, 0x75, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x11, 0x6d, 0x61, 0x78, 0x46, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x48, 0x6f, 0x75, 0x72, 0x73, 0x42, 0x0a, 0x0a, 0x08, 0x73, 0x63, 0x68, 0x65, - 0x64, 0x75, 0x6c, 0x65, 0x22, 0xed, 0x0b, 0x0a, 0x04, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x32, 0x0a, + 0x64, 0x75, 0x6c, 0x65, 0x22, 0x98, 0x0c, 0x0a, 0x04, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x32, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x6f, 0x6f, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, @@ -2206,29 +2209,31 @@ var file_v1_config_proto_rawDesc = []byte{ 0x1a, 0x0a, 0x15, 0x43, 0x4f, 0x4e, 0x44, 0x49, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0xc9, 0x01, 0x12, 0x1c, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x44, 0x49, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, 0x53, - 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0xca, 0x01, 0x22, 0x7f, 0x0a, 0x07, 0x4f, 0x6e, 0x45, - 0x72, 0x72, 0x6f, 0x72, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, - 0x5f, 0x49, 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, 0x5f, - 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x10, 0x01, 0x12, 0x12, - 0x0a, 0x0e, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x46, 0x41, 0x54, 0x41, 0x4c, - 0x10, 0x02, 0x12, 0x1c, 0x0a, 0x18, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x52, - 0x45, 0x54, 0x52, 0x59, 0x5f, 0x31, 0x30, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x53, 0x10, 0x65, - 0x12, 0x18, 0x0a, 0x14, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x52, 0x45, 0x54, - 0x52, 0x59, 0x5f, 0x31, 0x48, 0x4f, 0x55, 0x52, 0x10, 0x66, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x42, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, - 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, - 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x1e, 0x0a, 0x05, 0x75, 0x73, 0x65, 0x72, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, - 0x72, 0x52, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x22, 0x51, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x29, 0x0a, 0x0f, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, - 0x5f, 0x62, 0x63, 0x72, 0x79, 0x70, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, - 0x0e, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x42, 0x63, 0x72, 0x79, 0x70, 0x74, 0x42, - 0x0a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x42, 0x2c, 0x5a, 0x2a, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, - 0x67, 0x65, 0x6f, 0x72, 0x67, 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, - 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0xca, 0x01, 0x22, 0xa9, 0x01, 0x0a, 0x07, 0x4f, 0x6e, + 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, + 0x52, 0x5f, 0x49, 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, + 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x10, 0x01, 0x12, + 0x12, 0x0a, 0x0e, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x46, 0x41, 0x54, 0x41, + 0x4c, 0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, + 0x52, 0x45, 0x54, 0x52, 0x59, 0x5f, 0x31, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x10, 0x64, 0x12, + 0x1c, 0x0a, 0x18, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x52, 0x45, 0x54, 0x52, + 0x59, 0x5f, 0x31, 0x30, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x53, 0x10, 0x65, 0x12, 0x26, 0x0a, + 0x22, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x52, 0x45, 0x54, 0x52, 0x59, 0x5f, + 0x45, 0x58, 0x50, 0x4f, 0x4e, 0x45, 0x4e, 0x54, 0x49, 0x41, 0x4c, 0x5f, 0x42, 0x41, 0x43, 0x4b, + 0x4f, 0x46, 0x46, 0x10, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, + 0x42, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x69, 0x73, 0x61, 0x62, + 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x64, 0x69, 0x73, 0x61, 0x62, + 0x6c, 0x65, 0x64, 0x12, 0x1e, 0x0a, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x05, 0x75, 0x73, + 0x65, 0x72, 0x73, 0x22, 0x51, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x29, 0x0a, 0x0f, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x5f, 0x62, 0x63, 0x72, 0x79, + 0x70, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x61, 0x73, 0x73, + 0x77, 0x6f, 0x72, 0x64, 0x42, 0x63, 0x72, 0x79, 0x70, 0x74, 0x42, 0x0a, 0x0a, 0x08, 0x70, 0x61, + 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, + 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, + 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/hook/errors.go b/internal/hook/errors.go index 7d6f82d8..7baa0ac6 100644 --- a/internal/hook/errors.go +++ b/internal/hook/errors.go @@ -31,12 +31,14 @@ func (e HookErrorFatal) Unwrap() error { return e.Err } +type RetryBackoffPolicy = func(attempt int) time.Duration + // HookErrorRetry requests that the calling operation retry after a specified backoff duration type HookErrorRetry struct { Err error - Backoff time.Duration + Backoff RetryBackoffPolicy } func (e HookErrorRetry) Error() string { - return fmt.Sprintf("retry after %v: %v", e.Backoff, e.Err.Error()) + return fmt.Sprintf("retry: %v", e.Err.Error()) } diff --git a/internal/hook/hook.go b/internal/hook/hook.go index 9fc8db0f..080a3f12 100644 --- a/internal/hook/hook.go +++ b/internal/hook/hook.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "reflect" "slices" "time" @@ -114,28 +115,39 @@ func firstMatchingCondition(hook *v1.Hook, events []v1.Hook_Condition) v1.Hook_C return v1.Hook_CONDITION_UNKNOWN } -func curTimeMs() int64 { - return time.Now().UnixNano() / 1000000 -} - -type Hook v1.Hook - func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error { if err == nil || errors.As(err, &HookErrorFatal{}) || errors.As(err, &HookErrorRequestCancel{}) { return err } - if onError == v1.Hook_ON_ERROR_CANCEL { + switch onError { + case v1.Hook_ON_ERROR_CANCEL: return &HookErrorRequestCancel{Err: err} - } else if onError == v1.Hook_ON_ERROR_FATAL { + case v1.Hook_ON_ERROR_FATAL: return &HookErrorFatal{Err: err} + case v1.Hook_ON_ERROR_RETRY_1MINUTE: + return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { + return 1 * time.Minute + }} + case v1.Hook_ON_ERROR_RETRY_10MINUTES: + return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { + return 10 * time.Minute + }} + case v1.Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF: + return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { + return time.Duration(math.Pow(2, float64(attempt-1))) * time.Minute + }} + case v1.Hook_ON_ERROR_IGNORE: + return err + default: + panic(fmt.Sprintf("unknown on_error policy %v", onError)) } - return err } // IsHaltingError returns true if the error is a fatal error or a request to cancel the operation func IsHaltingError(err error) bool { var fatalErr *HookErrorFatal var cancelErr *HookErrorRequestCancel - return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) + var retryErr *HookErrorRetry + return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) || errors.As(err, &retryErr) } diff --git a/internal/hook/hook_test.go b/internal/hook/hook_test.go new file mode 100644 index 00000000..b0a4ece7 --- /dev/null +++ b/internal/hook/hook_test.go @@ -0,0 +1,16 @@ +package hook + +import ( + "errors" + "testing" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" +) + +// TestApplyHookErrorPolicy tests that applyHookErrorPolicy is defined for all values of Hook_OnError. +func TestApplyHookErrorPolicy(t *testing.T) { + values := v1.Hook_OnError(0).Descriptor().Values() + for i := 0; i < values.Len(); i++ { + applyHookErrorPolicy(v1.Hook_OnError(values.Get(i).Number()), errors.New("an error")) + } +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 527ae7ed..78437089 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -13,6 +13,7 @@ import ( "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" + "github.com/garethgeorge/backrest/internal/oplog/indexutil" "github.com/garethgeorge/backrest/internal/orchestrator/logging" "github.com/garethgeorge/backrest/internal/orchestrator/repo" "github.com/garethgeorge/backrest/internal/orchestrator/tasks" @@ -47,6 +48,7 @@ var _ tasks.TaskExecutor = &Orchestrator{} type stContainer struct { tasks.ScheduledTask + retryCount int // number of times this task has been retried. configModno int32 callbacks []func(error) } @@ -297,18 +299,47 @@ func (o *Orchestrator) Run(ctx context.Context) { } }() + // Clone the operation incase we need to reset changes and reschedule the task for a retry + originalOp := proto.Clone(t.Op).(*v1.Operation) + if t.Op != nil { + // Delete any previous hook executions for this operation incase this is a retry. + prevHookExecutionIDs := []int64{} + if err := o.OpLog.ForEach(oplog.Query{FlowId: t.Op.FlowId}, indexutil.CollectAll(), func(op *v1.Operation) error { + if hookOp := op.Op.(*v1.Operation_OperationRunHook); hookOp != nil && hookOp.OperationRunHook.ParentOp == t.Op.Id { + prevHookExecutionIDs = append(prevHookExecutionIDs, op.Id) + } + return nil + }); err != nil { + zap.L().Error("failed to collect previous hook execution IDs", zap.Error(err)) + } + + if err := o.OpLog.Delete(prevHookExecutionIDs...); err != nil { + zap.L().Error("failed to delete previous hook execution IDs", zap.Error(err)) + } + } + err := o.RunTask(taskCtx, t.ScheduledTask) + cancelTaskCtx() o.mu.Lock() curCfgModno := o.config.Modno o.mu.Unlock() if t.configModno == curCfgModno { // Only reschedule tasks if the config hasn't changed since the task was scheduled. - if err := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); err != nil { - zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(err)) + var retryErr tasks.TaskRetryError + if errors.As(err, &retryErr) { + // If the task returned a retry error, schedule for a retry reusing the same task and operation data. + t.Op = originalOp + t.retryCount += 1 + t.RunAt = time.Now().Add(retryErr.Backoff(t.retryCount)) + o.taskQueue.Enqueue(t.RunAt, tasks.TaskPriorityDefault, t) + continue // skip executing the task's callbacks. + } else if e := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); e != nil { + // Schedule the next execution of the task + zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(e)) } } - cancelTaskCtx() + for _, cb := range t.callbacks { go cb(err) } @@ -320,7 +351,6 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs}) op := st.Op - originalOp := proto.Clone(op).(*v1.Operation) runner := newTaskRunnerImpl(o, st.Task, st.Op) zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339))) @@ -367,11 +397,7 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro if errors.As(err, &taskCancelledError) { op.Status = v1.OperationStatus_STATUS_USER_CANCELLED } else if errors.As(err, &taskRetryError) { - st.Op = originalOp - op = originalOp - st.RunAt = time.Now().Add(taskRetryError.Backoff) op.Status = v1.OperationStatus_STATUS_PENDING - o.taskQueue.Enqueue(st.RunAt, tasks.TaskPriorityDefault, stContainer{}) } // prepend the error to the display diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 4453b60c..01962ea9 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "errors" "sync" "testing" "time" @@ -135,6 +136,58 @@ func TestTaskRescheduling(t *testing.T) { } } +func TestTaskRetry(t *testing.T) { + t.Parallel() + + // Arrange + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + orch, err := NewOrchestrator("", config.NewDefaultConfig(), nil, nil) + if err != nil { + t.Fatalf("failed to create orchestrator: %v", err) + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + orch.Run(ctx) + }() + + // Act + count := 0 + ranTimes := 0 + + orch.ScheduleTask(newTestTask( + func() error { + ranTimes += 1 + if ranTimes == 10 { + cancel() + } + return tasks.TaskRetryError{ + Err: errors.New("retry please"), + Backoff: func(attempt int) time.Duration { return 0 }, + } + }, + func(t time.Time) *time.Time { + count += 1 + return &t + }, + ), tasks.TaskPriorityDefault) + + wg.Wait() + + if count != 1 { + t.Errorf("expected 1 Next calls because this test covers retries, got %d", count) + } + + if ranTimes != 10 { + t.Errorf("expected 10 Run calls, got %d", ranTimes) + } +} + func TestGracefulShutdown(t *testing.T) { t.Parallel() diff --git a/internal/orchestrator/tasks/errors.go b/internal/orchestrator/tasks/errors.go index 1acaa019..5da3ef49 100644 --- a/internal/orchestrator/tasks/errors.go +++ b/internal/orchestrator/tasks/errors.go @@ -18,22 +18,16 @@ func (e TaskCancelledError) Is(err error) bool { return ok } +type RetryBackoffPolicy = func(attempt int) time.Duration + // TaskRetryError is returned when a task should be retried after a specified backoff duration. type TaskRetryError struct { Err error - Backoff time.Duration + Backoff RetryBackoffPolicy } func (e TaskRetryError) Error() string { - return fmt.Sprintf("retry after %v: %v", e.Backoff, e.Err.Error()) -} - -func (e TaskRetryError) Is(err error) bool { - other, ok := err.(TaskRetryError) - if !ok { - return false - } - return e.Backoff == other.Backoff && e.Err == other.Err + return fmt.Sprintf("retry: %v", e.Err.Error()) } func (e TaskRetryError) Unwrap() error { diff --git a/proto/v1/config.proto b/proto/v1/config.proto index ebd30e26..42bb5fe9 100644 --- a/proto/v1/config.proto +++ b/proto/v1/config.proto @@ -151,9 +151,10 @@ message Hook { enum OnError { ON_ERROR_IGNORE = 0; ON_ERROR_CANCEL = 1; // cancels the operation and skips subsequent hooks - ON_ERROR_FATAL = 2; // fails the operation and subsequent hooks + ON_ERROR_FATAL = 2; // fails the operation and subsequent hooks. + ON_ERROR_RETRY_1MINUTE = 100; // retry the operation every minute ON_ERROR_RETRY_10MINUTES = 101; // retry the operation every 10 minutes - ON_ERROR_RETRY_1HOUR = 102; // retry the operation every hour + ON_ERROR_RETRY_EXPONENTIAL_BACKOFF = 103; // retry the operation with exponential backoff up to 1h max. } repeated Condition conditions = 1 [json_name="conditions"]; diff --git a/webui/gen/ts/v1/config_pb.ts b/webui/gen/ts/v1/config_pb.ts index e617b700..290295d1 100644 --- a/webui/gen/ts/v1/config_pb.ts +++ b/webui/gen/ts/v1/config_pb.ts @@ -1102,12 +1102,19 @@ export enum Hook_OnError { CANCEL = 1, /** - * fails the operation and subsequent hooks + * fails the operation and subsequent hooks. * * @generated from enum value: ON_ERROR_FATAL = 2; */ FATAL = 2, + /** + * retry the operation every minute + * + * @generated from enum value: ON_ERROR_RETRY_1MINUTE = 100; + */ + RETRY_1MINUTE = 100, + /** * retry the operation every 10 minutes * @@ -1116,19 +1123,20 @@ export enum Hook_OnError { RETRY_10MINUTES = 101, /** - * retry the operation every hour + * retry the operation with exponential backoff up to 1h max. * - * @generated from enum value: ON_ERROR_RETRY_1HOUR = 102; + * @generated from enum value: ON_ERROR_RETRY_EXPONENTIAL_BACKOFF = 103; */ - RETRY_1HOUR = 102, + RETRY_EXPONENTIAL_BACKOFF = 103, } // Retrieve enum metadata with: proto3.getEnumType(Hook_OnError) proto3.util.setEnumType(Hook_OnError, "v1.Hook.OnError", [ { no: 0, name: "ON_ERROR_IGNORE" }, { no: 1, name: "ON_ERROR_CANCEL" }, { no: 2, name: "ON_ERROR_FATAL" }, + { no: 100, name: "ON_ERROR_RETRY_1MINUTE" }, { no: 101, name: "ON_ERROR_RETRY_10MINUTES" }, - { no: 102, name: "ON_ERROR_RETRY_1HOUR" }, + { no: 103, name: "ON_ERROR_RETRY_EXPONENTIAL_BACKOFF" }, ]); /**