Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Aug 26, 2024
1 parent 3785fae commit 35d435b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
7 changes: 4 additions & 3 deletions internal/api/backresthandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func TestCancelBackup(t *testing.T) {

// Find the backup operation ID in the oplog
var backupOpId int64
if err := retry(t, 100, 50*time.Millisecond, func() error {
if err := retry(t, 100, 10*time.Millisecond, func() error {
operations := getOperations(t, sut.oplog)
for _, op := range operations {
_, ok := op.GetOp().(*v1.Operation_OperationBackup)
Expand All @@ -498,16 +498,17 @@ func TestCancelBackup(t *testing.T) {
}
return nil
})

if err := errgroup.Wait(); err != nil {
t.Fatalf(err.Error())
}

// Assert that the backup operation was cancelled
if slices.IndexFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationBackup)
return op.Status == v1.OperationStatus_STATUS_USER_CANCELLED && ok
return op.Status == v1.OperationStatus_STATUS_ERROR && ok
}) == -1 {
t.Fatalf("Expected a cancelled backup operation in the log")
t.Fatalf("Expected a failed backup operation in the log")
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error {
}}
case v1.Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return time.Duration(math.Pow(2, float64(attempt-1))) * 10 * time.Second
d := time.Duration(math.Pow(2, float64(attempt-1))) * 10 * time.Second
if d > 1*time.Hour {
return 1 * time.Hour
}
return d
}}
case v1.Hook_ON_ERROR_IGNORE:
return err
Expand Down
26 changes: 13 additions & 13 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
Expand All @@ -29,13 +28,12 @@ var ErrPlanNotFound = errors.New("plan not found")

// Orchestrator is responsible for managing repos and backups.
type Orchestrator struct {
mu sync.Mutex
config *v1.Config
OpLog *oplog.OpLog
repoPool *resticRepoPool
taskQueue *queue.TimePriorityQueue[stContainer]
readyTaskQueues map[string]chan tasks.Task
logStore *rotatinglog.RotatingLog
mu sync.Mutex
config *v1.Config
OpLog *oplog.OpLog
repoPool *resticRepoPool
taskQueue *queue.TimePriorityQueue[stContainer]
logStore *rotatinglog.RotatingLog

// cancelNotify is a list of channels that are notified when a task should be cancelled.
cancelNotify []chan int64
Expand Down Expand Up @@ -332,7 +330,7 @@ func (o *Orchestrator) Run(ctx context.Context) {
if t.Op != nil {
// Delete any previous hook executions for this operation incase this is a retry.
prevHookExecutionIDs := []int64{}
if err := o.OpLog.ForEach(oplog.Query{FlowId: t.Op.FlowId}, indexutil.CollectAll(), func(op *v1.Operation) error {
if err := o.OpLog.Query(oplog.Query{FlowID: t.Op.FlowId}, func(op *v1.Operation) error {
if hookOp, ok := op.Op.(*v1.Operation_OperationRunHook); ok && hookOp.OperationRunHook.GetParentOp() == t.Op.Id {
prevHookExecutionIDs = append(prevHookExecutionIDs, op.Id)
}
Expand All @@ -347,14 +345,13 @@ func (o *Orchestrator) Run(ctx context.Context) {
}

err := o.RunTask(taskCtx, t.ScheduledTask)
cancelTaskCtx()

o.mu.Lock()
curCfgModno := o.config.Modno
o.mu.Unlock()
if t.configModno == curCfgModno {
// Only reschedule tasks if the config hasn't changed since the task was scheduled.
var retryErr tasks.TaskRetryError
var retryErr *tasks.TaskRetryError
if errors.As(err, &retryErr) {
// If the task returned a retry error, schedule for a retry reusing the same task and operation data.
t.retryCount += 1
Expand All @@ -379,6 +376,7 @@ func (o *Orchestrator) Run(ctx context.Context) {
zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(e))
}
}
cancelTaskCtx()

for _, cb := range t.callbacks {
go cb(err)
Expand Down Expand Up @@ -430,12 +428,14 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
}
}
if err != nil {
var taskCancelledError tasks.TaskCancelledError
var taskRetryError tasks.TaskRetryError
var taskCancelledError *tasks.TaskCancelledError
var taskRetryError *tasks.TaskRetryError
if errors.As(err, &taskCancelledError) {
op.Status = v1.OperationStatus_STATUS_USER_CANCELLED
} else if errors.As(err, &taskRetryError) {
op.Status = v1.OperationStatus_STATUS_PENDING
} else {
op.Status = v1.OperationStatus_STATUS_ERROR
}

// prepend the error to the display
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestTaskRetry(t *testing.T) {
if ranTimes == 10 {
cancel()
}
return tasks.TaskRetryError{
return &tasks.TaskRetryError{
Err: errors.New("retry please"),
Backoff: func(attempt int) time.Duration { return 0 },
}
Expand Down
4 changes: 2 additions & 2 deletions internal/orchestrator/taskrunnerimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (t *taskRunnerImpl) ExecuteHooks(ctx context.Context, events []v1.Hook_Cond
var cancelErr *hook.HookErrorRequestCancel
var retryErr *hook.HookErrorRetry
if errors.As(err, &cancelErr) {
return fmt.Errorf("%v: %w: %w", task.Name(), tasks.TaskCancelledError{}, cancelErr.Err)
return fmt.Errorf("%v: %w: %w", task.Name(), &tasks.TaskCancelledError{}, cancelErr.Err)
} else if errors.As(err, &retryErr) {
return fmt.Errorf("%v: %w", task.Name(), tasks.TaskRetryError{
return fmt.Errorf("%v: %w", task.Name(), &tasks.TaskRetryError{
Err: retryErr.Err,
Backoff: retryErr.Backoff,
})
Expand Down

0 comments on commit 35d435b

Please sign in to comment.