From 038bc87070361ff3b7d9a90c075787e9ff3948f7 Mon Sep 17 00:00:00 2001 From: Gareth Date: Mon, 26 Aug 2024 19:21:18 -0700 Subject: [PATCH] feat: implement 'on error retry' policy (#428) --- gen/go/v1/config.pb.go | 73 +++--- internal/api/backresthandler_test.go | 215 ++++++++++++++---- internal/hook/errors.go | 17 +- internal/hook/hook.go | 36 ++- internal/hook/hook_test.go | 16 ++ internal/oplog/bboltstore/bboltstore.go | 19 ++ internal/oplog/memstore/memstore.go | 169 ++++++++++++++ .../oplog/storetests/storecontract_test.go | 15 +- internal/orchestrator/orchestrator.go | 69 ++++-- internal/orchestrator/orchestrator_test.go | 53 +++++ internal/orchestrator/taskrunnerimpl.go | 8 +- internal/orchestrator/tasks/errors.go | 37 ++- proto/v1/config.proto | 5 +- webui/gen/ts/v1/config_pb.ts | 26 ++- webui/src/components/OperationRow.tsx | 5 +- webui/src/components/OperationTree.tsx | 2 +- 16 files changed, 654 insertions(+), 111 deletions(-) create mode 100644 internal/hook/hook_test.go create mode 100644 internal/oplog/memstore/memstore.go diff --git a/gen/go/v1/config.pb.go b/gen/go/v1/config.pb.go index 55b50d7c..38235a42 100644 --- a/gen/go/v1/config.pb.go +++ b/gen/go/v1/config.pb.go @@ -206,22 +206,31 @@ func (Hook_Condition) EnumDescriptor() ([]byte, []int) { type Hook_OnError int32 const ( - Hook_ON_ERROR_IGNORE Hook_OnError = 0 - Hook_ON_ERROR_CANCEL Hook_OnError = 1 // cancels the operation and skips subsequent hooks - Hook_ON_ERROR_FATAL Hook_OnError = 2 // fails the operation and subsequent hooks + Hook_ON_ERROR_IGNORE Hook_OnError = 0 + Hook_ON_ERROR_CANCEL Hook_OnError = 1 // cancels the operation and skips subsequent hooks + Hook_ON_ERROR_FATAL Hook_OnError = 2 // fails the operation and subsequent hooks. + Hook_ON_ERROR_RETRY_1MINUTE Hook_OnError = 100 // retry the operation every minute + Hook_ON_ERROR_RETRY_10MINUTES Hook_OnError = 101 // retry the operation every 10 minutes + Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF Hook_OnError = 103 // retry the operation with exponential backoff up to 1h max. ) // Enum value maps for Hook_OnError. var ( Hook_OnError_name = map[int32]string{ - 0: "ON_ERROR_IGNORE", - 1: "ON_ERROR_CANCEL", - 2: "ON_ERROR_FATAL", + 0: "ON_ERROR_IGNORE", + 1: "ON_ERROR_CANCEL", + 2: "ON_ERROR_FATAL", + 100: "ON_ERROR_RETRY_1MINUTE", + 101: "ON_ERROR_RETRY_10MINUTES", + 103: "ON_ERROR_RETRY_EXPONENTIAL_BACKOFF", } Hook_OnError_value = map[string]int32{ - "ON_ERROR_IGNORE": 0, - "ON_ERROR_CANCEL": 1, - "ON_ERROR_FATAL": 2, + "ON_ERROR_IGNORE": 0, + "ON_ERROR_CANCEL": 1, + "ON_ERROR_FATAL": 2, + "ON_ERROR_RETRY_1MINUTE": 100, + "ON_ERROR_RETRY_10MINUTES": 101, + "ON_ERROR_RETRY_EXPONENTIAL_BACKOFF": 103, } ) @@ -2114,7 +2123,7 @@ var file_v1_config_proto_rawDesc = []byte{ 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x48, 0x6f, 0x75, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x11, 0x6d, 0x61, 0x78, 0x46, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x48, 0x6f, 0x75, 0x72, 0x73, 0x42, 0x0a, 0x0a, 0x08, 0x73, 0x63, 0x68, 0x65, - 0x64, 0x75, 0x6c, 0x65, 0x22, 0xb5, 0x0b, 0x0a, 0x04, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x32, 0x0a, + 0x64, 0x75, 0x6c, 0x65, 0x22, 0x98, 0x0c, 0x0a, 0x04, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x32, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x6f, 0x6f, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, @@ -2200,25 +2209,31 @@ var file_v1_config_proto_rawDesc = []byte{ 0x1a, 0x0a, 0x15, 0x43, 0x4f, 0x4e, 0x44, 0x49, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0xc9, 0x01, 0x12, 0x1c, 0x0a, 0x17, 0x43, 0x4f, 0x4e, 0x44, 0x49, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, 0x53, - 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0xca, 0x01, 0x22, 0x47, 0x0a, 0x07, 0x4f, 0x6e, 0x45, - 0x72, 0x72, 0x6f, 0x72, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, - 0x5f, 0x49, 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, 0x5f, - 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x10, 0x01, 0x12, 0x12, - 0x0a, 0x0e, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x46, 0x41, 0x54, 0x41, 0x4c, - 0x10, 0x02, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x42, 0x0a, 0x04, - 0x41, 0x75, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x64, - 0x12, 0x1e, 0x0a, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x08, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, - 0x22, 0x51, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x29, 0x0a, 0x0f, - 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x5f, 0x62, 0x63, 0x72, 0x79, 0x70, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, - 0x64, 0x42, 0x63, 0x72, 0x79, 0x70, 0x74, 0x42, 0x0a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, - 0x6f, 0x72, 0x64, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, 0x65, 0x2f, 0x62, - 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0xca, 0x01, 0x22, 0xa9, 0x01, 0x0a, 0x07, 0x4f, 0x6e, + 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, + 0x52, 0x5f, 0x49, 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x4e, + 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x10, 0x01, 0x12, + 0x12, 0x0a, 0x0e, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x46, 0x41, 0x54, 0x41, + 0x4c, 0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, + 0x52, 0x45, 0x54, 0x52, 0x59, 0x5f, 0x31, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x10, 0x64, 0x12, + 0x1c, 0x0a, 0x18, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x52, 0x45, 0x54, 0x52, + 0x59, 0x5f, 0x31, 0x30, 0x4d, 0x49, 0x4e, 0x55, 0x54, 0x45, 0x53, 0x10, 0x65, 0x12, 0x26, 0x0a, + 0x22, 0x4f, 0x4e, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x52, 0x45, 0x54, 0x52, 0x59, 0x5f, + 0x45, 0x58, 0x50, 0x4f, 0x4e, 0x45, 0x4e, 0x54, 0x49, 0x41, 0x4c, 0x5f, 0x42, 0x41, 0x43, 0x4b, + 0x4f, 0x46, 0x46, 0x10, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, + 0x42, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x69, 0x73, 0x61, 0x62, + 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x64, 0x69, 0x73, 0x61, 0x62, + 0x6c, 0x65, 0x64, 0x12, 0x1e, 0x0a, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x05, 0x75, 0x73, + 0x65, 0x72, 0x73, 0x22, 0x51, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x29, 0x0a, 0x0f, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x5f, 0x62, 0x63, 0x72, 0x79, + 0x70, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x61, 0x73, 0x73, + 0x77, 0x6f, 0x72, 0x64, 0x42, 0x63, 0x72, 0x79, 0x70, 0x74, 0x42, 0x0a, 0x0a, 0x08, 0x70, 0x61, + 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, + 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, + 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/api/backresthandler_test.go b/internal/api/backresthandler_test.go index e36d06a6..96c9080d 100644 --- a/internal/api/backresthandler_test.go +++ b/internal/api/backresthandler_test.go @@ -21,7 +21,6 @@ import ( "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" "github.com/garethgeorge/backrest/internal/orchestrator" - "github.com/garethgeorge/backrest/internal/orchestrator/tasks" "github.com/garethgeorge/backrest/internal/resticinstaller" "github.com/garethgeorge/backrest/internal/rotatinglog" "golang.org/x/sync/errgroup" @@ -102,6 +101,7 @@ func TestBackup(t *testing.T) { Id: "local", Uri: t.TempDir(), Password: "test", + Flags: []string{"--no-cache"}, }, }, Plans: []*v1.Plan{ @@ -143,7 +143,7 @@ func TestBackup(t *testing.T) { // Wait for the index snapshot operation to appear in the oplog. var snapshotOp *v1.Operation - if err := retry(t, 10, 2*time.Second, func() error { + if err := retry(t, 10, 1*time.Second, func() error { operations := getOperations(t, sut.oplog) if index := slices.IndexFunc(operations, func(op *v1.Operation) bool { _, ok := op.GetOp().(*v1.Operation_OperationIndexSnapshot) @@ -162,7 +162,7 @@ func TestBackup(t *testing.T) { } // Wait for a forget operation to appear in the oplog. - if err := retry(t, 10, 2*time.Second, func() error { + if err := retry(t, 10, 1*time.Second, func() error { operations := getOperations(t, sut.oplog) if index := slices.IndexFunc(operations, func(op *v1.Operation) bool { _, ok := op.GetOp().(*v1.Operation_OperationForget) @@ -181,6 +181,8 @@ func TestBackup(t *testing.T) { } func TestMultipleBackup(t *testing.T) { + t.Parallel() + sut := createSystemUnderTest(t, &config.MemoryStore{ Config: &v1.Config{ Modno: 1234, @@ -190,6 +192,7 @@ func TestMultipleBackup(t *testing.T) { Id: "local", Uri: t.TempDir(), Password: "test", + Flags: []string{"--no-cache"}, }, }, Plans: []*v1.Plan{ @@ -226,7 +229,7 @@ func TestMultipleBackup(t *testing.T) { } // Wait for a forget that removed 1 snapshot to appear in the oplog - if err := retry(t, 10, 2*time.Second, func() error { + if err := retry(t, 10, 1*time.Second, func() error { operations := getOperations(t, sut.oplog) if index := slices.IndexFunc(operations, func(op *v1.Operation) bool { forget, ok := op.GetOp().(*v1.Operation_OperationForget) @@ -261,6 +264,7 @@ func TestHookExecution(t *testing.T) { Id: "local", Uri: t.TempDir(), Password: "test", + Flags: []string{"--no-cache"}, }, }, Plans: []*v1.Plan{ @@ -312,7 +316,7 @@ func TestHookExecution(t *testing.T) { } // Wait for two hook operations to appear in the oplog - if err := retry(t, 10, 2*time.Second, func() error { + if err := retry(t, 10, 1*time.Second, func() error { hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { _, ok := op.GetOp().(*v1.Operation_OperationRunHook) return !ok @@ -336,7 +340,7 @@ func TestHookExecution(t *testing.T) { } } -func TestHookCancellation(t *testing.T) { +func TestHookOnErrorHandling(t *testing.T) { t.Parallel() if runtime.GOOS == "windows" { @@ -352,11 +356,12 @@ func TestHookCancellation(t *testing.T) { Id: "local", Uri: t.TempDir(), Password: "test", + Flags: []string{"--no-cache"}, }, }, Plans: []*v1.Plan{ { - Id: "test", + Id: "test-cancel", Repo: "local", Paths: []string{ t.TempDir(), @@ -378,6 +383,75 @@ func TestHookCancellation(t *testing.T) { }, }, }, + { + Id: "test-error", + Repo: "local", + Paths: []string{ + t.TempDir(), + }, + Schedule: &v1.Schedule{ + Schedule: &v1.Schedule_Disabled{Disabled: true}, + }, + Hooks: []*v1.Hook{ + { + Conditions: []v1.Hook_Condition{ + v1.Hook_CONDITION_SNAPSHOT_START, + }, + Action: &v1.Hook_ActionCommand{ + ActionCommand: &v1.Hook_Command{ + Command: "exit 123", + }, + }, + OnError: v1.Hook_ON_ERROR_FATAL, + }, + }, + }, + { + Id: "test-ignore", + Repo: "local", + Paths: []string{ + t.TempDir(), + }, + Schedule: &v1.Schedule{ + Schedule: &v1.Schedule_Disabled{Disabled: true}, + }, + Hooks: []*v1.Hook{ + { + Conditions: []v1.Hook_Condition{ + v1.Hook_CONDITION_SNAPSHOT_START, + }, + Action: &v1.Hook_ActionCommand{ + ActionCommand: &v1.Hook_Command{ + Command: "exit 123", + }, + }, + OnError: v1.Hook_ON_ERROR_IGNORE, + }, + }, + }, + { + Id: "test-retry", + Repo: "local", + Paths: []string{ + t.TempDir(), + }, + Schedule: &v1.Schedule{ + Schedule: &v1.Schedule_Disabled{Disabled: true}, + }, + Hooks: []*v1.Hook{ + { + Conditions: []v1.Hook_Condition{ + v1.Hook_CONDITION_SNAPSHOT_START, + }, + Action: &v1.Hook_ActionCommand{ + ActionCommand: &v1.Hook_Command{ + Command: "exit 123", + }, + }, + OnError: v1.Hook_ON_ERROR_RETRY_10MINUTES, + }, + }, + }, }, }, }) @@ -388,43 +462,93 @@ func TestHookCancellation(t *testing.T) { sut.orch.Run(ctx) }() - _, err := sut.handler.Backup(context.Background(), connect.NewRequest(&types.StringValue{Value: "test"})) - if !errors.Is(err, tasks.ErrTaskCancelled) { - t.Fatalf("Backup() error = %v, want errors.Is(err, tasks.ErrTaskCancelled)", err) + tests := []struct { + name string + plan string + wantHookStatus v1.OperationStatus + wantBackupStatus v1.OperationStatus + wantBackupError bool + noWaitForBackup bool + }{ + { + name: "cancel", + plan: "test-cancel", + wantHookStatus: v1.OperationStatus_STATUS_ERROR, + wantBackupStatus: v1.OperationStatus_STATUS_USER_CANCELLED, + wantBackupError: true, + }, + { + name: "error", + plan: "test-error", + wantHookStatus: v1.OperationStatus_STATUS_ERROR, + wantBackupStatus: v1.OperationStatus_STATUS_ERROR, + wantBackupError: true, + }, + { + name: "ignore", + plan: "test-ignore", + wantHookStatus: v1.OperationStatus_STATUS_ERROR, + wantBackupStatus: v1.OperationStatus_STATUS_SUCCESS, + wantBackupError: false, + }, + { + name: "retry", + plan: "test-retry", + wantHookStatus: v1.OperationStatus_STATUS_ERROR, + wantBackupStatus: v1.OperationStatus_STATUS_PENDING, + wantBackupError: false, + noWaitForBackup: true, + }, } - // Wait for a hook operation to appear in the oplog - if err := retry(t, 10, 2*time.Second, func() error { - hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { - _, ok := op.GetOp().(*v1.Operation_OperationRunHook) - return !ok - }) - if len(hookOps) != 1 { - return fmt.Errorf("expected 1 hook operations, got %d", len(hookOps)) - } - if hookOps[0].Status != v1.OperationStatus_STATUS_ERROR { - return fmt.Errorf("expected hook operation error status, got %v", hookOps[0].Status) - } - return nil - }); err != nil { - t.Fatalf("Couldn't find hooks in oplog: %v", err) - } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sut.opstore.ResetForTest(t) - // assert that the backup operation is in the log and is cancelled - if err := retry(t, 10, 2*time.Second, func() error { - backupOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { - _, ok := op.GetOp().(*v1.Operation_OperationBackup) - return !ok + var errgroup errgroup.Group + + errgroup.Go(func() error { + _, err := sut.handler.Backup(context.Background(), connect.NewRequest(&types.StringValue{Value: tc.plan})) + if (err != nil) != tc.wantBackupError { + return fmt.Errorf("Backup() error = %v, wantErr %v", err, tc.wantBackupError) + } + return nil + }) + + if !tc.noWaitForBackup { + if err := errgroup.Wait(); err != nil { + t.Fatalf(err.Error()) + } + } + + // Wait for hook operation to be attempted in the oplog + if err := retry(t, 10, 1*time.Second, func() error { + hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { + _, ok := op.GetOp().(*v1.Operation_OperationRunHook) + return !ok + }) + if len(hookOps) != 1 { + return fmt.Errorf("expected 1 hook operations, got %d", len(hookOps)) + } + if hookOps[0].Status != tc.wantHookStatus { + return fmt.Errorf("expected hook operation error status, got %v", hookOps[0].Status) + } + return nil + }); err != nil { + t.Fatalf("Couldn't find hook operation in oplog: %v", err) + } + + backupOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { + _, ok := op.GetOp().(*v1.Operation_OperationBackup) + return !ok + }) + if len(backupOps) != 1 { + t.Errorf("expected 1 backup operation, got %d", len(backupOps)) + } + if backupOps[0].Status != tc.wantBackupStatus { + t.Errorf("expected backup operation cancelled status, got %v", backupOps[0].Status) + } }) - if len(backupOps) != 1 { - return fmt.Errorf("expected 1 backup operation, got %d", len(backupOps)) - } - if backupOps[0].Status != v1.OperationStatus_STATUS_USER_CANCELLED { - return fmt.Errorf("expected backup operation cancelled status, got %v", backupOps[0].Status) - } - return nil - }); err != nil { - t.Fatalf("Couldn't find hooks in oplog: %v", err) } } @@ -440,6 +564,7 @@ func TestCancelBackup(t *testing.T) { Id: "local", Uri: t.TempDir(), Password: "test", + Flags: []string{"--no-cache"}, }, }, Plans: []*v1.Plan{ @@ -478,7 +603,7 @@ func TestCancelBackup(t *testing.T) { // Find the backup operation ID in the oplog var backupOpId int64 - if err := retry(t, 100, 50*time.Millisecond, func() error { + if err := retry(t, 100, 10*time.Millisecond, func() error { operations := getOperations(t, sut.oplog) for _, op := range operations { _, ok := op.GetOp().(*v1.Operation_OperationBackup) @@ -498,6 +623,7 @@ func TestCancelBackup(t *testing.T) { } return nil }) + if err := errgroup.Wait(); err != nil { t.Fatalf(err.Error()) } @@ -505,9 +631,9 @@ func TestCancelBackup(t *testing.T) { // Assert that the backup operation was cancelled if slices.IndexFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { _, ok := op.GetOp().(*v1.Operation_OperationBackup) - return op.Status == v1.OperationStatus_STATUS_USER_CANCELLED && ok + return op.Status == v1.OperationStatus_STATUS_ERROR && ok }) == -1 { - t.Fatalf("Expected a cancelled backup operation in the log") + t.Fatalf("Expected a failed backup operation in the log") } } @@ -528,6 +654,7 @@ func TestRestore(t *testing.T) { Id: "local", Uri: t.TempDir(), Password: "test", + Flags: []string{"--no-cache"}, }, }, Plans: []*v1.Plan{ @@ -637,6 +764,7 @@ func TestRestore(t *testing.T) { type systemUnderTest struct { handler *BackrestHandler oplog *oplog.OpLog + opstore *bboltstore.BboltStore orch *orchestrator.Orchestrator logStore *rotatinglog.RotatingLog config *v1.Config @@ -684,6 +812,7 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT return systemUnderTest{ handler: h, oplog: oplog, + opstore: opstore, orch: orch, logStore: logStore, config: cfg, diff --git a/internal/hook/errors.go b/internal/hook/errors.go index ca04d80c..7baa0ac6 100644 --- a/internal/hook/errors.go +++ b/internal/hook/errors.go @@ -1,6 +1,9 @@ package hook -import "fmt" +import ( + "fmt" + "time" +) // HookErrorCancel requests that the calling operation cancel itself. It must be handled explicitly caller. Subsequent hooks will be skipped. type HookErrorRequestCancel struct { @@ -27,3 +30,15 @@ func (e HookErrorFatal) Error() string { func (e HookErrorFatal) Unwrap() error { return e.Err } + +type RetryBackoffPolicy = func(attempt int) time.Duration + +// HookErrorRetry requests that the calling operation retry after a specified backoff duration +type HookErrorRetry struct { + Err error + Backoff RetryBackoffPolicy +} + +func (e HookErrorRetry) Error() string { + return fmt.Sprintf("retry: %v", e.Err.Error()) +} diff --git a/internal/hook/hook.go b/internal/hook/hook.go index 9fc8db0f..0a8ca179 100644 --- a/internal/hook/hook.go +++ b/internal/hook/hook.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "reflect" "slices" "time" @@ -114,28 +115,43 @@ func firstMatchingCondition(hook *v1.Hook, events []v1.Hook_Condition) v1.Hook_C return v1.Hook_CONDITION_UNKNOWN } -func curTimeMs() int64 { - return time.Now().UnixNano() / 1000000 -} - -type Hook v1.Hook - func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error { if err == nil || errors.As(err, &HookErrorFatal{}) || errors.As(err, &HookErrorRequestCancel{}) { return err } - if onError == v1.Hook_ON_ERROR_CANCEL { + switch onError { + case v1.Hook_ON_ERROR_CANCEL: return &HookErrorRequestCancel{Err: err} - } else if onError == v1.Hook_ON_ERROR_FATAL { + case v1.Hook_ON_ERROR_FATAL: return &HookErrorFatal{Err: err} + case v1.Hook_ON_ERROR_RETRY_1MINUTE: + return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { + return 1 * time.Minute + }} + case v1.Hook_ON_ERROR_RETRY_10MINUTES: + return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { + return 10 * time.Minute + }} + case v1.Hook_ON_ERROR_RETRY_EXPONENTIAL_BACKOFF: + return &HookErrorRetry{Err: err, Backoff: func(attempt int) time.Duration { + d := time.Duration(math.Pow(2, float64(attempt-1))) * 10 * time.Second + if d > 1*time.Hour { + return 1 * time.Hour + } + return d + }} + case v1.Hook_ON_ERROR_IGNORE: + return err + default: + panic(fmt.Sprintf("unknown on_error policy %v", onError)) } - return err } // IsHaltingError returns true if the error is a fatal error or a request to cancel the operation func IsHaltingError(err error) bool { var fatalErr *HookErrorFatal var cancelErr *HookErrorRequestCancel - return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) + var retryErr *HookErrorRetry + return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) || errors.As(err, &retryErr) } diff --git a/internal/hook/hook_test.go b/internal/hook/hook_test.go new file mode 100644 index 00000000..b0a4ece7 --- /dev/null +++ b/internal/hook/hook_test.go @@ -0,0 +1,16 @@ +package hook + +import ( + "errors" + "testing" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" +) + +// TestApplyHookErrorPolicy tests that applyHookErrorPolicy is defined for all values of Hook_OnError. +func TestApplyHookErrorPolicy(t *testing.T) { + values := v1.Hook_OnError(0).Descriptor().Values() + for i := 0; i < values.Len(); i++ { + applyHookErrorPolicy(v1.Hook_OnError(values.Get(i).Number()), errors.New("an error")) + } +} diff --git a/internal/oplog/bboltstore/bboltstore.go b/internal/oplog/bboltstore/bboltstore.go index 1b101e0a..a9fe6384 100644 --- a/internal/oplog/bboltstore/bboltstore.go +++ b/internal/oplog/bboltstore/bboltstore.go @@ -6,6 +6,7 @@ import ( "os" "path" "slices" + "testing" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" @@ -432,3 +433,21 @@ func (o *BboltStore) forAll(tx *bolt.Tx, do func(*v1.Operation) error) error { } return nil } + +func (o *BboltStore) ResetForTest(t *testing.T) { + if err := o.db.Update(func(tx *bolt.Tx) error { + for _, bucket := range [][]byte{ + SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, SnapshotIndexBucket, FlowIdIndexBucket, InstanceIndexBucket, + } { + if err := tx.DeleteBucket(bucket); err != nil { + return fmt.Errorf("deleting bucket %s: %w", string(bucket), err) + } + if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { + return fmt.Errorf("creating bucket %s: %w", string(bucket), err) + } + } + return nil + }); err != nil { + t.Fatalf("error resetting database: %s", err) + } +} diff --git a/internal/oplog/memstore/memstore.go b/internal/oplog/memstore/memstore.go new file mode 100644 index 00000000..668bf061 --- /dev/null +++ b/internal/oplog/memstore/memstore.go @@ -0,0 +1,169 @@ +package memstore + +import ( + "errors" + "slices" + "sync" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/oplog" + "github.com/garethgeorge/backrest/internal/protoutil" + "google.golang.org/protobuf/proto" +) + +type MemStore struct { + mu sync.Mutex + operations map[int64]*v1.Operation + nextID int64 +} + +var _ oplog.OpStore = &MemStore{} + +func NewMemStore() *MemStore { + return &MemStore{ + operations: make(map[int64]*v1.Operation), + } +} + +func (m *MemStore) Version() (int64, error) { + return 0, nil +} + +func (m *MemStore) SetVersion(version int64) error { + return nil +} + +func (m *MemStore) idsForQuery(q oplog.Query) []int64 { + ids := make([]int64, 0, len(m.operations)) + for id := range m.operations { + ids = append(ids, id) + } + slices.SortFunc(ids, func(i, j int64) int { return int(i - j) }) + ids = slices.DeleteFunc(ids, func(id int64) bool { + op := m.operations[id] + return !q.Match(op) + }) + + if q.Offset > 0 { + if int(q.Offset) >= len(ids) { + ids = nil + } else { + ids = ids[q.Offset:] + } + } + + if q.Limit > 0 && len(ids) > q.Limit { + ids = ids[:q.Limit] + } + + if q.Reversed { + slices.Reverse(ids) + } + + return ids +} + +func (m *MemStore) Query(q oplog.Query, f func(*v1.Operation) error) error { + m.mu.Lock() + defer m.mu.Unlock() + + ids := m.idsForQuery(q) + + for _, id := range ids { + if err := f(proto.Clone(m.operations[id]).(*v1.Operation)); err != nil { + if err == oplog.ErrStopIteration { + break + } + return err + } + } + + return nil +} + +func (m *MemStore) Transform(q oplog.Query, f func(*v1.Operation) (*v1.Operation, error)) error { + m.mu.Lock() + defer m.mu.Unlock() + + ids := m.idsForQuery(q) + + changes := make(map[int64]*v1.Operation) + + for _, id := range ids { + if op, err := f(proto.Clone(m.operations[id]).(*v1.Operation)); err != nil { + if err == oplog.ErrStopIteration { + break + } + return err + } else if op != nil { + changes[id] = op + } + } + + // Apply changes after the loop to avoid modifying the map until the transaction is complete. + for id, op := range changes { + m.operations[id] = op + } + + return nil +} + +func (m *MemStore) Add(op ...*v1.Operation) error { + m.mu.Lock() + defer m.mu.Unlock() + + for _, o := range op { + if o.Id != 0 { + return errors.New("operation already has an ID, OpLog.Add is expected to set the ID") + } + m.nextID++ + o.Id = m.nextID + if o.FlowId == 0 { + o.FlowId = o.Id + } + if err := protoutil.ValidateOperation(o); err != nil { + return err + } + } + + for _, o := range op { + m.operations[o.Id] = o + } + return nil +} + +func (m *MemStore) Get(opID int64) (*v1.Operation, error) { + m.mu.Lock() + defer m.mu.Unlock() + op, ok := m.operations[opID] + if !ok { + return nil, oplog.ErrNotExist + } + return op, nil +} + +func (m *MemStore) Delete(opID ...int64) ([]*v1.Operation, error) { + m.mu.Lock() + defer m.mu.Unlock() + ops := make([]*v1.Operation, 0, len(opID)) + for idx, id := range opID { + ops[idx] = m.operations[id] + delete(m.operations, id) + } + return ops, nil +} + +func (m *MemStore) Update(op ...*v1.Operation) error { + m.mu.Lock() + defer m.mu.Unlock() + for _, o := range op { + if err := protoutil.ValidateOperation(o); err != nil { + return err + } + if _, ok := m.operations[o.Id]; !ok { + return oplog.ErrNotExist + } + m.operations[o.Id] = o + } + return nil +} diff --git a/internal/oplog/storetests/storecontract_test.go b/internal/oplog/storetests/storecontract_test.go index 06508373..032664ef 100644 --- a/internal/oplog/storetests/storecontract_test.go +++ b/internal/oplog/storetests/storecontract_test.go @@ -8,6 +8,8 @@ import ( v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" + "github.com/garethgeorge/backrest/internal/oplog/memstore" + "google.golang.org/protobuf/proto" ) const ( @@ -24,7 +26,8 @@ func StoresForTest(t testing.TB) map[string]oplog.OpStore { t.Cleanup(func() { bboltstore.Close() }) return map[string]oplog.OpStore{ - "bbolt": bboltstore, + "bbolt": bboltstore, + "memory": memstore.NewMemStore(), } } @@ -123,11 +126,12 @@ func TestAddOperation(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { log := oplog.NewOpLog(store) - if err := log.Add(tc.op); (err != nil) != tc.wantErr { + op := proto.Clone(tc.op).(*v1.Operation) + if err := log.Add(op); (err != nil) != tc.wantErr { t.Errorf("Add() error = %v, wantErr %v", err, tc.wantErr) } if !tc.wantErr { - if tc.op.Id == 0 { + if op.Id == 0 { t.Errorf("Add() did not set op ID") } } @@ -222,7 +226,7 @@ func TestListOperation(t *testing.T) { t.Run(name, func(t *testing.T) { log := oplog.NewOpLog(store) for _, op := range ops { - if err := log.Add(op); err != nil { + if err := log.Add(proto.Clone(op).(*v1.Operation)); err != nil { t.Fatalf("error adding operation: %s", err) } } @@ -290,6 +294,7 @@ func TestIndexSnapshot(t *testing.T) { for name, store := range StoresForTest(t) { t.Run(name, func(t *testing.T) { log := oplog.NewOpLog(store) + op := proto.Clone(op).(*v1.Operation) if err := log.Add(op); err != nil { t.Fatalf("error adding operation: %s", err) @@ -327,6 +332,7 @@ func TestUpdateOperation(t *testing.T) { for name, store := range StoresForTest(t) { t.Run(name, func(t *testing.T) { log := oplog.NewOpLog(store) + op := proto.Clone(op).(*v1.Operation) if err := log.Add(op); err != nil { t.Fatalf("error adding operation: %s", err) @@ -462,5 +468,4 @@ func BenchmarkList(b *testing.B) { } }) } - } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 854c4c9f..26755248 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -28,13 +28,12 @@ var ErrPlanNotFound = errors.New("plan not found") // Orchestrator is responsible for managing repos and backups. type Orchestrator struct { - mu sync.Mutex - config *v1.Config - OpLog *oplog.OpLog - repoPool *resticRepoPool - taskQueue *queue.TimePriorityQueue[stContainer] - readyTaskQueues map[string]chan tasks.Task - logStore *rotatinglog.RotatingLog + mu sync.Mutex + config *v1.Config + OpLog *oplog.OpLog + repoPool *resticRepoPool + taskQueue *queue.TimePriorityQueue[stContainer] + logStore *rotatinglog.RotatingLog // cancelNotify is a list of channels that are notified when a task should be cancelled. cancelNotify []chan int64 @@ -47,6 +46,7 @@ var _ tasks.TaskExecutor = &Orchestrator{} type stContainer struct { tasks.ScheduledTask + retryCount int // number of times this task has been retried. configModno int32 callbacks []func(error) } @@ -325,6 +325,25 @@ func (o *Orchestrator) Run(ctx context.Context) { } }() + // Clone the operation incase we need to reset changes and reschedule the task for a retry + originalOp := proto.Clone(t.Op).(*v1.Operation) + if t.Op != nil { + // Delete any previous hook executions for this operation incase this is a retry. + prevHookExecutionIDs := []int64{} + if err := o.OpLog.Query(oplog.Query{FlowID: t.Op.FlowId}, func(op *v1.Operation) error { + if hookOp, ok := op.Op.(*v1.Operation_OperationRunHook); ok && hookOp.OperationRunHook.GetParentOp() == t.Op.Id { + prevHookExecutionIDs = append(prevHookExecutionIDs, op.Id) + } + return nil + }); err != nil { + zap.L().Error("failed to collect previous hook execution IDs", zap.Error(err)) + } + zap.S().Debugf("deleting previous hook execution IDs: %v", prevHookExecutionIDs) + if err := o.OpLog.Delete(prevHookExecutionIDs...); err != nil { + zap.L().Error("failed to delete previous hook execution IDs", zap.Error(err)) + } + } + err := o.RunTask(taskCtx, t.ScheduledTask) o.mu.Lock() @@ -332,8 +351,29 @@ func (o *Orchestrator) Run(ctx context.Context) { o.mu.Unlock() if t.configModno == curCfgModno { // Only reschedule tasks if the config hasn't changed since the task was scheduled. - if err := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); err != nil { - zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(err)) + var retryErr *tasks.TaskRetryError + if errors.As(err, &retryErr) { + // If the task returned a retry error, schedule for a retry reusing the same task and operation data. + t.retryCount += 1 + delay := retryErr.Backoff(t.retryCount) + if t.Op != nil { + t.Op = originalOp + t.Op.DisplayMessage = fmt.Sprintf("waiting for retry, current backoff delay: %v", delay) + t.Op.UnixTimeStartMs = t.RunAt.UnixMilli() + if err := o.OpLog.Update(t.Op); err != nil { + zap.S().Errorf("failed to update operation in oplog: %v", err) + } + } + t.RunAt = time.Now().Add(delay) + o.taskQueue.Enqueue(t.RunAt, tasks.TaskPriorityDefault, t) + zap.L().Info("retrying task", + zap.String("task", t.Task.Name()), + zap.String("runAt", t.RunAt.Format(time.RFC3339)), + zap.Duration("delay", delay)) + continue // skip executing the task's callbacks. + } else if e := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); e != nil { + // Schedule the next execution of the task + zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(e)) } } cancelTaskCtx() @@ -348,11 +388,11 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro logs := bytes.NewBuffer(nil) ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs}) + op := st.Op runner := newTaskRunnerImpl(o, st.Task, st.Op) zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339))) - op := st.Op if op != nil { op.UnixTimeStartMs = time.Now().UnixMilli() if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_UNKNOWN { @@ -388,10 +428,13 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro } } if err != nil { - if ctx.Err() != nil || errors.Is(err, tasks.ErrTaskCancelled) { - // task was cancelled + var taskCancelledError *tasks.TaskCancelledError + var taskRetryError *tasks.TaskRetryError + if errors.As(err, &taskCancelledError) { op.Status = v1.OperationStatus_STATUS_USER_CANCELLED - } else if err != nil { + } else if errors.As(err, &taskRetryError) { + op.Status = v1.OperationStatus_STATUS_PENDING + } else { op.Status = v1.OperationStatus_STATUS_ERROR } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 4453b60c..4e6b0aaa 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "errors" "sync" "testing" "time" @@ -135,6 +136,58 @@ func TestTaskRescheduling(t *testing.T) { } } +func TestTaskRetry(t *testing.T) { + t.Parallel() + + // Arrange + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + orch, err := NewOrchestrator("", config.NewDefaultConfig(), nil, nil) + if err != nil { + t.Fatalf("failed to create orchestrator: %v", err) + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + orch.Run(ctx) + }() + + // Act + count := 0 + ranTimes := 0 + + orch.ScheduleTask(newTestTask( + func() error { + ranTimes += 1 + if ranTimes == 10 { + cancel() + } + return &tasks.TaskRetryError{ + Err: errors.New("retry please"), + Backoff: func(attempt int) time.Duration { return 0 }, + } + }, + func(t time.Time) *time.Time { + count += 1 + return &t + }, + ), tasks.TaskPriorityDefault) + + wg.Wait() + + if count != 1 { + t.Errorf("expected 1 Next calls because this test covers retries, got %d", count) + } + + if ranTimes != 10 { + t.Errorf("expected 10 Run calls, got %d", ranTimes) + } +} + func TestGracefulShutdown(t *testing.T) { t.Parallel() diff --git a/internal/orchestrator/taskrunnerimpl.go b/internal/orchestrator/taskrunnerimpl.go index 0a8e3b3f..171e6175 100644 --- a/internal/orchestrator/taskrunnerimpl.go +++ b/internal/orchestrator/taskrunnerimpl.go @@ -110,8 +110,14 @@ func (t *taskRunnerImpl) ExecuteHooks(ctx context.Context, events []v1.Hook_Cond } if err := t.orchestrator.RunTask(ctx, st); hook.IsHaltingError(err) { var cancelErr *hook.HookErrorRequestCancel + var retryErr *hook.HookErrorRetry if errors.As(err, &cancelErr) { - return fmt.Errorf("%w: %w", tasks.ErrTaskCancelled, err) + return fmt.Errorf("%v: %w: %w", task.Name(), &tasks.TaskCancelledError{}, cancelErr.Err) + } else if errors.As(err, &retryErr) { + return fmt.Errorf("%v: %w", task.Name(), &tasks.TaskRetryError{ + Err: retryErr.Err, + Backoff: retryErr.Backoff, + }) } return fmt.Errorf("%v: %w", task.Name(), err) } diff --git a/internal/orchestrator/tasks/errors.go b/internal/orchestrator/tasks/errors.go index ca02b9f4..5da3ef49 100644 --- a/internal/orchestrator/tasks/errors.go +++ b/internal/orchestrator/tasks/errors.go @@ -1,8 +1,35 @@ package tasks -import "errors" +import ( + "fmt" + "time" +) -// ErrTaskCancelled signals that the task is beign cancelled gracefully. -// This error is handled by marking the task as user cancelled. -// By default a task returning an error will be marked as failed otherwise. -var ErrTaskCancelled = errors.New("cancel task") +// TaskCancelledError is returned when a task is cancelled. +type TaskCancelledError struct { +} + +func (e TaskCancelledError) Error() string { + return "task cancelled" +} + +func (e TaskCancelledError) Is(err error) bool { + _, ok := err.(TaskCancelledError) + return ok +} + +type RetryBackoffPolicy = func(attempt int) time.Duration + +// TaskRetryError is returned when a task should be retried after a specified backoff duration. +type TaskRetryError struct { + Err error + Backoff RetryBackoffPolicy +} + +func (e TaskRetryError) Error() string { + return fmt.Sprintf("retry: %v", e.Err.Error()) +} + +func (e TaskRetryError) Unwrap() error { + return e.Err +} diff --git a/proto/v1/config.proto b/proto/v1/config.proto index 9d3579df..42bb5fe9 100644 --- a/proto/v1/config.proto +++ b/proto/v1/config.proto @@ -151,7 +151,10 @@ message Hook { enum OnError { ON_ERROR_IGNORE = 0; ON_ERROR_CANCEL = 1; // cancels the operation and skips subsequent hooks - ON_ERROR_FATAL = 2; // fails the operation and subsequent hooks + ON_ERROR_FATAL = 2; // fails the operation and subsequent hooks. + ON_ERROR_RETRY_1MINUTE = 100; // retry the operation every minute + ON_ERROR_RETRY_10MINUTES = 101; // retry the operation every 10 minutes + ON_ERROR_RETRY_EXPONENTIAL_BACKOFF = 103; // retry the operation with exponential backoff up to 1h max. } repeated Condition conditions = 1 [json_name="conditions"]; diff --git a/webui/gen/ts/v1/config_pb.ts b/webui/gen/ts/v1/config_pb.ts index 58d814b8..290295d1 100644 --- a/webui/gen/ts/v1/config_pb.ts +++ b/webui/gen/ts/v1/config_pb.ts @@ -1102,17 +1102,41 @@ export enum Hook_OnError { CANCEL = 1, /** - * fails the operation and subsequent hooks + * fails the operation and subsequent hooks. * * @generated from enum value: ON_ERROR_FATAL = 2; */ FATAL = 2, + + /** + * retry the operation every minute + * + * @generated from enum value: ON_ERROR_RETRY_1MINUTE = 100; + */ + RETRY_1MINUTE = 100, + + /** + * retry the operation every 10 minutes + * + * @generated from enum value: ON_ERROR_RETRY_10MINUTES = 101; + */ + RETRY_10MINUTES = 101, + + /** + * retry the operation with exponential backoff up to 1h max. + * + * @generated from enum value: ON_ERROR_RETRY_EXPONENTIAL_BACKOFF = 103; + */ + RETRY_EXPONENTIAL_BACKOFF = 103, } // Retrieve enum metadata with: proto3.getEnumType(Hook_OnError) proto3.util.setEnumType(Hook_OnError, "v1.Hook.OnError", [ { no: 0, name: "ON_ERROR_IGNORE" }, { no: 1, name: "ON_ERROR_CANCEL" }, { no: 2, name: "ON_ERROR_FATAL" }, + { no: 100, name: "ON_ERROR_RETRY_1MINUTE" }, + { no: 101, name: "ON_ERROR_RETRY_10MINUTES" }, + { no: 103, name: "ON_ERROR_RETRY_EXPONENTIAL_BACKOFF" }, ]); /** diff --git a/webui/src/components/OperationRow.tsx b/webui/src/components/OperationRow.tsx index dfe1d936..e1185c5d 100644 --- a/webui/src/components/OperationRow.tsx +++ b/webui/src/components/OperationRow.tsx @@ -119,7 +119,10 @@ export const OperationRow = ({ ); - if (operation.status == OperationStatus.STATUS_INPROGRESS) { + if ( + operation.status === OperationStatus.STATUS_INPROGRESS || + operation.status === OperationStatus.STATUS_PENDING + ) { title = ( <> {title} diff --git a/webui/src/components/OperationTree.tsx b/webui/src/components/OperationTree.tsx index 9e22dc9d..ce5e455a 100644 --- a/webui/src/components/OperationTree.tsx +++ b/webui/src/components/OperationTree.tsx @@ -494,7 +494,7 @@ const BackupView = ({ backup }: { backup?: FlowDisplayInfo }) => {

Backup on {formatTime(backup.displayTime)}

{backup.status !== OperationStatus.STATUS_PENDING && - backup.status != OperationStatus.STATUS_INPROGRESS + backup.status !== OperationStatus.STATUS_INPROGRESS ? deleteButton : null}