Skip to content

Commit

Permalink
working 'on error retry' implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Aug 26, 2024
1 parent 09c6c0e commit 3785fae
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 79 deletions.
75 changes: 40 additions & 35 deletions gen/go/v1/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/api/backresthandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestHookCancellation(t *testing.T) {
}()

_, err := sut.handler.Backup(context.Background(), connect.NewRequest(&types.StringValue{Value: "test"}))
if !errors.Is(err, tasks.ErrTaskCancelled) {
if !errors.Is(err, tasks.TaskCancelledError{}) {
t.Fatalf("Backup() error = %v, want errors.Is(err, tasks.ErrTaskCancelled)", err)
}

Expand Down
6 changes: 4 additions & 2 deletions internal/hook/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ func (e HookErrorFatal) Unwrap() error {
return e.Err
}

type RetryBackoffPolicy = func(attempt int) time.Duration

// HookErrorRetry requests that the calling operation retry after a specified backoff duration
type HookErrorRetry struct {
Err error
Backoff time.Duration
Backoff RetryBackoffPolicy
}

func (e HookErrorRetry) Error() string {
return fmt.Sprintf("retry after %v: %v", e.Backoff, e.Err.Error())
return fmt.Sprintf("retry: %v", e.Err.Error())
}
32 changes: 22 additions & 10 deletions internal/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"slices"
"time"
Expand Down Expand Up @@ -114,28 +115,39 @@ func firstMatchingCondition(hook *v1.Hook, events []v1.Hook_Condition) v1.Hook_C
return v1.Hook_CONDITION_UNKNOWN
}

func curTimeMs() int64 {
return time.Now().UnixNano() / 1000000
}

type Hook v1.Hook

func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error {
if err == nil || errors.As(err, &HookErrorFatal{}) || errors.As(err, &HookErrorRequestCancel{}) {
return err
}

if onError == v1.Hook_ON_ERROR_CANCEL {
switch onError {
case v1.Hook_ON_ERROR_CANCEL:
return &HookErrorRequestCancel{Err: err}
} else if onError == v1.Hook_ON_ERROR_FATAL {
case v1.Hook_ON_ERROR_FATAL:
return &HookErrorFatal{Err: err}
case v1.Hook_ON_ERROR_RETRY_1MINUTE:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return 1 * time.Minute
}}
case v1.Hook_ON_ERROR_RETRY_10MINUTES:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return 10 * time.Minute
}}
case v1.Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return time.Duration(math.Pow(2, float64(attempt-1))) * 10 * time.Second
}}
case v1.Hook_ON_ERROR_IGNORE:
return err
default:
panic(fmt.Sprintf("unknown on_error policy %v", onError))
}
return err
}

// IsHaltingError returns true if the error is a fatal error or a request to cancel the operation
func IsHaltingError(err error) bool {
var fatalErr *HookErrorFatal
var cancelErr *HookErrorRequestCancel
return errors.As(err, &fatalErr) || errors.As(err, &cancelErr)
var retryErr *HookErrorRetry
return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) || errors.As(err, &retryErr)
}
16 changes: 16 additions & 0 deletions internal/hook/hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package hook

import (
"errors"
"testing"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
)

// TestApplyHookErrorPolicy tests that applyHookErrorPolicy is defined for all values of Hook_OnError.
func TestApplyHookErrorPolicy(t *testing.T) {
values := v1.Hook_OnError(0).Descriptor().Values()
for i := 0; i < values.Len(); i++ {
applyHookErrorPolicy(v1.Hook_OnError(values.Get(i).Number()), errors.New("an error"))
}
}
54 changes: 46 additions & 8 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
Expand Down Expand Up @@ -47,6 +48,7 @@ var _ tasks.TaskExecutor = &Orchestrator{}

type stContainer struct {
tasks.ScheduledTask
retryCount int // number of times this task has been retried.
configModno int32
callbacks []func(error)
}
Expand Down Expand Up @@ -325,18 +327,59 @@ func (o *Orchestrator) Run(ctx context.Context) {
}
}()

// Clone the operation incase we need to reset changes and reschedule the task for a retry
originalOp := proto.Clone(t.Op).(*v1.Operation)
if t.Op != nil {
// Delete any previous hook executions for this operation incase this is a retry.
prevHookExecutionIDs := []int64{}
if err := o.OpLog.ForEach(oplog.Query{FlowId: t.Op.FlowId}, indexutil.CollectAll(), func(op *v1.Operation) error {
if hookOp, ok := op.Op.(*v1.Operation_OperationRunHook); ok && hookOp.OperationRunHook.GetParentOp() == t.Op.Id {
prevHookExecutionIDs = append(prevHookExecutionIDs, op.Id)
}
return nil
}); err != nil {
zap.L().Error("failed to collect previous hook execution IDs", zap.Error(err))
}
zap.S().Debugf("deleting previous hook execution IDs: %v", prevHookExecutionIDs)
if err := o.OpLog.Delete(prevHookExecutionIDs...); err != nil {
zap.L().Error("failed to delete previous hook execution IDs", zap.Error(err))
}
}

err := o.RunTask(taskCtx, t.ScheduledTask)
cancelTaskCtx()

o.mu.Lock()
curCfgModno := o.config.Modno
o.mu.Unlock()
if t.configModno == curCfgModno {
// Only reschedule tasks if the config hasn't changed since the task was scheduled.
if err := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); err != nil {
zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(err))
var retryErr tasks.TaskRetryError
if errors.As(err, &retryErr) {
// If the task returned a retry error, schedule for a retry reusing the same task and operation data.
t.retryCount += 1
delay := retryErr.Backoff(t.retryCount)
if t.Op != nil {
t.Op = originalOp
t.Op.DisplayMessage = fmt.Sprintf("waiting for retry, current backoff delay: %v", delay)
t.Op.UnixTimeStartMs = t.RunAt.UnixMilli()
if err := o.OpLog.Update(t.Op); err != nil {
zap.S().Errorf("failed to update operation in oplog: %v", err)
}
}
t.RunAt = time.Now().Add(delay)
o.taskQueue.Enqueue(t.RunAt, tasks.TaskPriorityDefault, t)
zap.L().Info("retrying task",
zap.String("task", t.Task.Name()),
zap.String("runAt", t.RunAt.Format(time.RFC3339)),
zap.Duration("delay", delay))
continue // skip executing the task's callbacks.
} else if e := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); e != nil {
// Schedule the next execution of the task
zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(e))
}
}
cancelTaskCtx()

for _, cb := range t.callbacks {
go cb(err)
}
Expand All @@ -348,7 +391,6 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs})

op := st.Op
originalOp := proto.Clone(op).(*v1.Operation)
runner := newTaskRunnerImpl(o, st.Task, st.Op)

zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339)))
Expand Down Expand Up @@ -393,11 +435,7 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
if errors.As(err, &taskCancelledError) {
op.Status = v1.OperationStatus_STATUS_USER_CANCELLED
} else if errors.As(err, &taskRetryError) {
st.Op = originalOp
op = originalOp
st.RunAt = time.Now().Add(taskRetryError.Backoff)
op.Status = v1.OperationStatus_STATUS_PENDING
o.taskQueue.Enqueue(st.RunAt, tasks.TaskPriorityDefault, stContainer{})
}

// prepend the error to the display
Expand Down
53 changes: 53 additions & 0 deletions internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -135,6 +136,58 @@ func TestTaskRescheduling(t *testing.T) {
}
}

func TestTaskRetry(t *testing.T) {
t.Parallel()

// Arrange
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

orch, err := NewOrchestrator("", config.NewDefaultConfig(), nil, nil)
if err != nil {
t.Fatalf("failed to create orchestrator: %v", err)
}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
orch.Run(ctx)
}()

// Act
count := 0
ranTimes := 0

orch.ScheduleTask(newTestTask(
func() error {
ranTimes += 1
if ranTimes == 10 {
cancel()
}
return tasks.TaskRetryError{
Err: errors.New("retry please"),
Backoff: func(attempt int) time.Duration { return 0 },
}
},
func(t time.Time) *time.Time {
count += 1
return &t
},
), tasks.TaskPriorityDefault)

wg.Wait()

if count != 1 {
t.Errorf("expected 1 Next calls because this test covers retries, got %d", count)
}

if ranTimes != 10 {
t.Errorf("expected 10 Run calls, got %d", ranTimes)
}
}

func TestGracefulShutdown(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 4 additions & 4 deletions internal/orchestrator/taskrunnerimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (t *taskRunnerImpl) ExecuteHooks(ctx context.Context, events []v1.Hook_Cond
var cancelErr *hook.HookErrorRequestCancel
var retryErr *hook.HookErrorRetry
if errors.As(err, &cancelErr) {
return fmt.Errorf("%v: %w: %w", task.Name(), tasks.TaskCancelledError{}, errors.Unwrap(err))
return fmt.Errorf("%v: %w: %w", task.Name(), tasks.TaskCancelledError{}, cancelErr.Err)
} else if errors.As(err, &retryErr) {
return fmt.Errorf("%v: %w: %w", task.Name(), tasks.TaskRetryError{
Err: errors.Unwrap(err),
return fmt.Errorf("%v: %w", task.Name(), tasks.TaskRetryError{
Err: retryErr.Err,
Backoff: retryErr.Backoff,
}, err)
})
}
return fmt.Errorf("%v: %w", task.Name(), err)
}
Expand Down
Loading

0 comments on commit 3785fae

Please sign in to comment.