diff --git a/internal/api/backresthandler_test.go b/internal/api/backresthandler_test.go index daebf270..aa838934 100644 --- a/internal/api/backresthandler_test.go +++ b/internal/api/backresthandler_test.go @@ -478,7 +478,7 @@ func TestCancelBackup(t *testing.T) { // Find the backup operation ID in the oplog var backupOpId int64 - if err := retry(t, 100, 50*time.Millisecond, func() error { + if err := retry(t, 100, 10*time.Millisecond, func() error { operations := getOperations(t, sut.oplog) for _, op := range operations { _, ok := op.GetOp().(*v1.Operation_OperationBackup) @@ -498,6 +498,7 @@ func TestCancelBackup(t *testing.T) { } return nil }) + if err := errgroup.Wait(); err != nil { t.Fatalf(err.Error()) } @@ -505,9 +506,9 @@ func TestCancelBackup(t *testing.T) { // Assert that the backup operation was cancelled if slices.IndexFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { _, ok := op.GetOp().(*v1.Operation_OperationBackup) - return op.Status == v1.OperationStatus_STATUS_USER_CANCELLED && ok + return op.Status == v1.OperationStatus_STATUS_ERROR && ok }) == -1 { - t.Fatalf("Expected a cancelled backup operation in the log") + t.Fatalf("Expected a failed backup operation in the log") } } diff --git a/internal/hook/hook.go b/internal/hook/hook.go index 4f0017cc..0a8ca179 100644 --- a/internal/hook/hook.go +++ b/internal/hook/hook.go @@ -135,7 +135,11 @@ func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error { }} case v1.Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF: return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { - return time.Duration(math.Pow(2, float64(attempt-1))) * 10 * time.Second + d := time.Duration(math.Pow(2, float64(attempt-1))) * 10 * time.Second + if d > 1*time.Hour { + return 1 * time.Hour + } + return d }} case v1.Hook_ON_ERROR_IGNORE: return err diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 3e29d732..26755248 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -13,7 +13,6 @@ import ( "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" - "github.com/garethgeorge/backrest/internal/oplog/indexutil" "github.com/garethgeorge/backrest/internal/orchestrator/logging" "github.com/garethgeorge/backrest/internal/orchestrator/repo" "github.com/garethgeorge/backrest/internal/orchestrator/tasks" @@ -29,13 +28,12 @@ var ErrPlanNotFound = errors.New("plan not found") // Orchestrator is responsible for managing repos and backups. type Orchestrator struct { - mu sync.Mutex - config *v1.Config - OpLog *oplog.OpLog - repoPool *resticRepoPool - taskQueue *queue.TimePriorityQueue[stContainer] - readyTaskQueues map[string]chan tasks.Task - logStore *rotatinglog.RotatingLog + mu sync.Mutex + config *v1.Config + OpLog *oplog.OpLog + repoPool *resticRepoPool + taskQueue *queue.TimePriorityQueue[stContainer] + logStore *rotatinglog.RotatingLog // cancelNotify is a list of channels that are notified when a task should be cancelled. cancelNotify []chan int64 @@ -332,7 +330,7 @@ func (o *Orchestrator) Run(ctx context.Context) { if t.Op != nil { // Delete any previous hook executions for this operation incase this is a retry. prevHookExecutionIDs := []int64{} - if err := o.OpLog.ForEach(oplog.Query{FlowId: t.Op.FlowId}, indexutil.CollectAll(), func(op *v1.Operation) error { + if err := o.OpLog.Query(oplog.Query{FlowID: t.Op.FlowId}, func(op *v1.Operation) error { if hookOp, ok := op.Op.(*v1.Operation_OperationRunHook); ok && hookOp.OperationRunHook.GetParentOp() == t.Op.Id { prevHookExecutionIDs = append(prevHookExecutionIDs, op.Id) } @@ -347,14 +345,13 @@ func (o *Orchestrator) Run(ctx context.Context) { } err := o.RunTask(taskCtx, t.ScheduledTask) - cancelTaskCtx() o.mu.Lock() curCfgModno := o.config.Modno o.mu.Unlock() if t.configModno == curCfgModno { // Only reschedule tasks if the config hasn't changed since the task was scheduled. - var retryErr tasks.TaskRetryError + var retryErr *tasks.TaskRetryError if errors.As(err, &retryErr) { // If the task returned a retry error, schedule for a retry reusing the same task and operation data. t.retryCount += 1 @@ -379,6 +376,7 @@ func (o *Orchestrator) Run(ctx context.Context) { zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(e)) } } + cancelTaskCtx() for _, cb := range t.callbacks { go cb(err) @@ -430,12 +428,14 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro } } if err != nil { - var taskCancelledError tasks.TaskCancelledError - var taskRetryError tasks.TaskRetryError + var taskCancelledError *tasks.TaskCancelledError + var taskRetryError *tasks.TaskRetryError if errors.As(err, &taskCancelledError) { op.Status = v1.OperationStatus_STATUS_USER_CANCELLED } else if errors.As(err, &taskRetryError) { op.Status = v1.OperationStatus_STATUS_PENDING + } else { + op.Status = v1.OperationStatus_STATUS_ERROR } // prepend the error to the display diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 01962ea9..4e6b0aaa 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -166,7 +166,7 @@ func TestTaskRetry(t *testing.T) { if ranTimes == 10 { cancel() } - return tasks.TaskRetryError{ + return &tasks.TaskRetryError{ Err: errors.New("retry please"), Backoff: func(attempt int) time.Duration { return 0 }, } diff --git a/internal/orchestrator/taskrunnerimpl.go b/internal/orchestrator/taskrunnerimpl.go index f138ae4e..171e6175 100644 --- a/internal/orchestrator/taskrunnerimpl.go +++ b/internal/orchestrator/taskrunnerimpl.go @@ -112,9 +112,9 @@ func (t *taskRunnerImpl) ExecuteHooks(ctx context.Context, events []v1.Hook_Cond var cancelErr *hook.HookErrorRequestCancel var retryErr *hook.HookErrorRetry if errors.As(err, &cancelErr) { - return fmt.Errorf("%v: %w: %w", task.Name(), tasks.TaskCancelledError{}, cancelErr.Err) + return fmt.Errorf("%v: %w: %w", task.Name(), &tasks.TaskCancelledError{}, cancelErr.Err) } else if errors.As(err, &retryErr) { - return fmt.Errorf("%v: %w", task.Name(), tasks.TaskRetryError{ + return fmt.Errorf("%v: %w", task.Name(), &tasks.TaskRetryError{ Err: retryErr.Err, Backoff: retryErr.Backoff, })