Skip to content

Commit

Permalink
working 'on error retry' implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Aug 21, 2024
1 parent b278dc2 commit a82d9aa
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 72 deletions.
75 changes: 40 additions & 35 deletions gen/go/v1/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions internal/hook/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ func (e HookErrorFatal) Unwrap() error {
return e.Err
}

type RetryBackoffPolicy = func(attempt int) time.Duration

// HookErrorRetry requests that the calling operation retry after a specified backoff duration
type HookErrorRetry struct {
Err error
Backoff time.Duration
Backoff RetryBackoffPolicy
}

func (e HookErrorRetry) Error() string {
return fmt.Sprintf("retry after %v: %v", e.Backoff, e.Err.Error())
return fmt.Sprintf("retry: %v", e.Err.Error())
}
32 changes: 22 additions & 10 deletions internal/hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"slices"
"time"
Expand Down Expand Up @@ -114,28 +115,39 @@ func firstMatchingCondition(hook *v1.Hook, events []v1.Hook_Condition) v1.Hook_C
return v1.Hook_CONDITION_UNKNOWN
}

func curTimeMs() int64 {
return time.Now().UnixNano() / 1000000
}

type Hook v1.Hook

func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error {
if err == nil || errors.As(err, &HookErrorFatal{}) || errors.As(err, &HookErrorRequestCancel{}) {
return err
}

if onError == v1.Hook_ON_ERROR_CANCEL {
switch onError {
case v1.Hook_ON_ERROR_CANCEL:
return &HookErrorRequestCancel{Err: err}
} else if onError == v1.Hook_ON_ERROR_FATAL {
case v1.Hook_ON_ERROR_FATAL:
return &HookErrorFatal{Err: err}
case v1.Hook_ON_ERROR_RETRY_1MINUTE:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return 1 * time.Minute
}}
case v1.Hook_ON_ERROR_RETRY_10MINUTES:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return 10 * time.Minute
}}
case v1.Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF:
return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration {
return time.Duration(math.Pow(2, float64(attempt-1))) * time.Minute
}}
case v1.Hook_ON_ERROR_IGNORE:
return err
default:
panic(fmt.Sprintf("unknown on_error policy %v", onError))
}
return err
}

// IsHaltingError returns true if the error is a fatal error or a request to cancel the operation
func IsHaltingError(err error) bool {
var fatalErr *HookErrorFatal
var cancelErr *HookErrorRequestCancel
return errors.As(err, &fatalErr) || errors.As(err, &cancelErr)
var retryErr *HookErrorRetry
return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) || errors.As(err, &retryErr)
}
16 changes: 16 additions & 0 deletions internal/hook/hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package hook

import (
"errors"
"testing"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
)

// TestApplyHookErrorPolicy tests that applyHookErrorPolicy is defined for all values of Hook_OnError.
func TestApplyHookErrorPolicy(t *testing.T) {
values := v1.Hook_OnError(0).Descriptor().Values()
for i := 0; i < values.Len(); i++ {
applyHookErrorPolicy(v1.Hook_OnError(values.Get(i).Number()), errors.New("an error"))
}
}
42 changes: 34 additions & 8 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
Expand Down Expand Up @@ -47,6 +48,7 @@ var _ tasks.TaskExecutor = &Orchestrator{}

type stContainer struct {
tasks.ScheduledTask
retryCount int // number of times this task has been retried.
configModno int32
callbacks []func(error)
}
Expand Down Expand Up @@ -297,18 +299,47 @@ func (o *Orchestrator) Run(ctx context.Context) {
}
}()

// Clone the operation incase we need to reset changes and reschedule the task for a retry
originalOp := proto.Clone(t.Op).(*v1.Operation)
if t.Op != nil {
// Delete any previous hook executions for this operation incase this is a retry.
prevHookExecutionIDs := []int64{}
if err := o.OpLog.ForEach(oplog.Query{FlowId: t.Op.FlowId}, indexutil.CollectAll(), func(op *v1.Operation) error {
if hookOp := op.Op.(*v1.Operation_OperationRunHook); hookOp != nil && hookOp.OperationRunHook.ParentOp == t.Op.Id {
prevHookExecutionIDs = append(prevHookExecutionIDs, op.Id)
}
return nil
}); err != nil {
zap.L().Error("failed to collect previous hook execution IDs", zap.Error(err))
}

if err := o.OpLog.Delete(prevHookExecutionIDs...); err != nil {
zap.L().Error("failed to delete previous hook execution IDs", zap.Error(err))
}
}

err := o.RunTask(taskCtx, t.ScheduledTask)
cancelTaskCtx()

o.mu.Lock()
curCfgModno := o.config.Modno
o.mu.Unlock()
if t.configModno == curCfgModno {
// Only reschedule tasks if the config hasn't changed since the task was scheduled.
if err := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); err != nil {
zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(err))
var retryErr tasks.TaskRetryError
if errors.As(err, &retryErr) {
// If the task returned a retry error, schedule for a retry reusing the same task and operation data.
t.Op = originalOp
t.retryCount += 1
t.RunAt = time.Now().Add(retryErr.Backoff(t.retryCount))
o.taskQueue.Enqueue(t.RunAt, tasks.TaskPriorityDefault, t)
continue // skip executing the task's callbacks.
} else if e := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); e != nil {
// Schedule the next execution of the task
zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(e))
}
}
cancelTaskCtx()

for _, cb := range t.callbacks {
go cb(err)
}
Expand All @@ -320,7 +351,6 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs})

op := st.Op
originalOp := proto.Clone(op).(*v1.Operation)
runner := newTaskRunnerImpl(o, st.Task, st.Op)

zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339)))
Expand Down Expand Up @@ -367,11 +397,7 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
if errors.As(err, &taskCancelledError) {
op.Status = v1.OperationStatus_STATUS_USER_CANCELLED
} else if errors.As(err, &taskRetryError) {
st.Op = originalOp
op = originalOp
st.RunAt = time.Now().Add(taskRetryError.Backoff)
op.Status = v1.OperationStatus_STATUS_PENDING
o.taskQueue.Enqueue(st.RunAt, tasks.TaskPriorityDefault, stContainer{})
}

// prepend the error to the display
Expand Down
53 changes: 53 additions & 0 deletions internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -135,6 +136,58 @@ func TestTaskRescheduling(t *testing.T) {
}
}

func TestTaskRetry(t *testing.T) {
t.Parallel()

// Arrange
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

orch, err := NewOrchestrator("", config.NewDefaultConfig(), nil, nil)
if err != nil {
t.Fatalf("failed to create orchestrator: %v", err)
}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
orch.Run(ctx)
}()

// Act
count := 0
ranTimes := 0

orch.ScheduleTask(newTestTask(
func() error {
ranTimes += 1
if ranTimes == 10 {
cancel()
}
return tasks.TaskRetryError{
Err: errors.New("retry please"),
Backoff: func(attempt int) time.Duration { return 0 },
}
},
func(t time.Time) *time.Time {
count += 1
return &t
},
), tasks.TaskPriorityDefault)

wg.Wait()

if count != 1 {
t.Errorf("expected 1 Next calls because this test covers retries, got %d", count)
}

if ranTimes != 10 {
t.Errorf("expected 10 Run calls, got %d", ranTimes)
}
}

func TestGracefulShutdown(t *testing.T) {
t.Parallel()

Expand Down
14 changes: 4 additions & 10 deletions internal/orchestrator/tasks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@ func (e TaskCancelledError) Is(err error) bool {
return ok
}

type RetryBackoffPolicy = func(attempt int) time.Duration

// TaskRetryError is returned when a task should be retried after a specified backoff duration.
type TaskRetryError struct {
Err error
Backoff time.Duration
Backoff RetryBackoffPolicy
}

func (e TaskRetryError) Error() string {
return fmt.Sprintf("retry after %v: %v", e.Backoff, e.Err.Error())
}

func (e TaskRetryError) Is(err error) bool {
other, ok := err.(TaskRetryError)
if !ok {
return false
}
return e.Backoff == other.Backoff && e.Err == other.Err
return fmt.Sprintf("retry: %v", e.Err.Error())
}

func (e TaskRetryError) Unwrap() error {
Expand Down
5 changes: 3 additions & 2 deletions proto/v1/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ message Hook {
enum OnError {
ON_ERROR_IGNORE = 0;
ON_ERROR_CANCEL = 1; // cancels the operation and skips subsequent hooks
ON_ERROR_FATAL = 2; // fails the operation and subsequent hooks
ON_ERROR_FATAL = 2; // fails the operation and subsequent hooks.
ON_ERROR_RETRY_1MINUTE = 100; // retry the operation every minute
ON_ERROR_RETRY_10MINUTES = 101; // retry the operation every 10 minutes
ON_ERROR_RETRY_1HOUR = 102; // retry the operation every hour
ON_ERROR_RETRY_EXPONENTIAL_BACKOFF = 103; // retry the operation with exponential backoff up to 1h max.
}

repeated Condition conditions = 1 [json_name="conditions"];
Expand Down
Loading

0 comments on commit a82d9aa

Please sign in to comment.