Skip to content

Commit

Permalink
importinto: add retry when update job table (pingcap#46790)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Sep 11, 2023
1 parent 02ceded commit ae442ad
Show file tree
Hide file tree
Showing 12 changed files with 452 additions and 48 deletions.
20 changes: 12 additions & 8 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ const (
var (
checkTaskFinishedInterval = 500 * time.Millisecond
nonRetrySQLTime = 1
retrySQLTimes = 30
retrySQLInterval = 3 * time.Second
// RetrySQLTimes is the max retry times when executing SQL.
RetrySQLTimes = 30
// RetrySQLInterval is the initial interval between two SQL retries.
RetrySQLInterval = 3 * time.Second
// RetrySQLMaxInterval is the max interval between two SQL retries.
RetrySQLMaxInterval = 30 * time.Second
)

// TaskHandle provides the interface for operations needed by Dispatcher.
Expand Down Expand Up @@ -191,7 +195,7 @@ func (d *BaseDispatcher) onReverting() error {
if prevStageFinished {
// Finish the rollback step.
logutil.Logger(d.logCtx).Info("update the task to reverted state")
return d.updateTask(proto.TaskStateReverted, nil, retrySQLTimes)
return d.updateTask(proto.TaskStateReverted, nil, RetrySQLTimes)
}
// Wait all subtasks in this stage finished.
d.OnTick(d.ctx, d.task)
Expand Down Expand Up @@ -321,7 +325,7 @@ func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subta
logutil.Logger(d.logCtx).Warn("updateTask first failed", zap.String("from", prevState), zap.String("to", d.task.State),
zap.Int("retry times", retryTimes), zap.Error(err))
}
time.Sleep(retrySQLInterval)
time.Sleep(RetrySQLInterval)
}
if err != nil && retryTimes != nonRetrySQLTime {
logutil.Logger(d.logCtx).Warn("updateTask failed",
Expand Down Expand Up @@ -354,7 +358,7 @@ func (d *BaseDispatcher) dispatchSubTask4Revert(task *proto.Task, meta []byte) e
for _, id := range instanceIDs {
subTasks = append(subTasks, proto.NewSubtask(task.ID, task.Type, id, meta))
}
return d.updateTask(proto.TaskStateReverting, subTasks, retrySQLTimes)
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
}

func (d *BaseDispatcher) onNextStage() error {
Expand All @@ -377,7 +381,7 @@ func (d *BaseDispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error
task.Concurrency = MaxSubtaskConcurrency
}

retryTimes := retrySQLTimes
retryTimes := RetrySQLTimes
// 2. Special handling for the new tasks.
if task.State == proto.TaskStatePending {
// TODO: Consider using TS.
Expand Down Expand Up @@ -428,7 +432,7 @@ func (d *BaseDispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(task.ID, task.Type, instanceID, meta))
}
return d.updateTask(proto.TaskStateRunning, subTasks, retrySQLTimes)
return d.updateTask(proto.TaskStateRunning, subTasks, RetrySQLTimes)
}

func (d *BaseDispatcher) handlePlanErr(err error) error {
Expand All @@ -438,7 +442,7 @@ func (d *BaseDispatcher) handlePlanErr(err error) error {
}
d.task.Error = err
// state transform: pending -> failed.
return d.updateTask(proto.TaskStateFailed, nil, retrySQLTimes)
return d.updateTask(proto.TaskStateFailed, nil, RetrySQLTimes)
}

// GenerateSchedulerNodes generate a eligible TiDB nodes.
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/dispatcher/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestMain(m *testing.M) {
// Make test more fast.
checkTaskRunningInterval = checkTaskRunningInterval / 10
checkTaskFinishedInterval = checkTaskFinishedInterval / 10
retrySQLInterval = retrySQLInterval / 20
RetrySQLInterval = RetrySQLInterval / 20

opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//disttask/framework/proto",
"//disttask/framework/storage",
"//util/backoff",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
Expand All @@ -24,7 +25,10 @@ go_test(
"//disttask/framework/proto",
"//disttask/framework/storage",
"//testkit",
"//util/backoff",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
],
Expand Down
31 changes: 31 additions & 0 deletions disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/util/backoff"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -118,3 +119,33 @@ func CancelGlobalTask(taskKey string) error {
}
return globalTaskManager.CancelGlobalTask(globalTask.ID)
}

// RunWithRetry runs a function with retry, when retry exceed max retry time, it
// returns the last error met.
// if the function fails with err, it should return a bool to indicate whether
// the error is retryable.
// if context done, it will stop early and return ctx.Err().
func RunWithRetry(
ctx context.Context,
maxRetry int,
backoffer backoff.Backoffer,
logger *zap.Logger,
f func(context.Context) (bool, error),
) error {
var lastErr error
for i := 0; i < maxRetry; i++ {
retryable, err := f(ctx)
if err == nil || !retryable {
return err
}
lastErr = err
logger.Warn("met retryable error", zap.Int("retry-count", i),
zap.Int("max-retry", maxRetry), zap.Error(err))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoffer.Backoff(i)):
}
}
return lastErr
}
66 changes: 66 additions & 0 deletions disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package handle_test

import (
"context"
"math"
"sync/atomic"
"testing"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/disttask/framework/handle"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/backoff"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
)
Expand Down Expand Up @@ -56,3 +61,64 @@ func TestHandle(t *testing.T) {

require.NoError(t, handle.CancelGlobalTask("1"))
}

func TestRunWithRetry(t *testing.T) {
ctx := context.Background()

// retry count exceed
backoffer := backoff.NewExponential(100*time.Millisecond, 1, time.Second)
err := handle.RunWithRetry(ctx, 3, backoffer, log.L(),
func(ctx context.Context) (bool, error) {
return true, errors.New("mock error")
},
)
require.ErrorContains(t, err, "mock error")

// non-retryable error
var end atomic.Bool
go func() {
defer end.Store(true)
backoffer = backoff.NewExponential(100*time.Millisecond, 1, time.Second)
err = handle.RunWithRetry(ctx, math.MaxInt, backoffer, log.L(),
func(ctx context.Context) (bool, error) {
return false, errors.New("mock error")
},
)
require.Error(t, err)
}()
require.Eventually(t, func() bool {
return end.Load()
}, 5*time.Second, 100*time.Millisecond)

// fail with retryable error once, then success
end.Store(false)
go func() {
defer end.Store(true)
backoffer = backoff.NewExponential(100*time.Millisecond, 1, time.Second)
var i int
err = handle.RunWithRetry(ctx, math.MaxInt, backoffer, log.L(),
func(ctx context.Context) (bool, error) {
if i == 0 {
i++
return true, errors.New("mock error")
}
return false, nil
},
)
require.NoError(t, err)
}()
require.Eventually(t, func() bool {
return end.Load()
}, 5*time.Second, 100*time.Millisecond)

// context done
subctx, cancel := context.WithCancel(ctx)
cancel()
backoffer = backoff.NewExponential(100*time.Millisecond, 1, time.Second)
err = handle.RunWithRetry(subctx, math.MaxInt, backoffer, log.L(),
func(ctx context.Context) (bool, error) {
return true, errors.New("mock error")
},
)
require.ErrorIs(t, err, context.Canceled)
}
6 changes: 5 additions & 1 deletion disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//sessionctx/variable",
"//table/tables",
"//util",
"//util/backoff",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
Expand All @@ -68,6 +69,7 @@ go_test(
timeout = "short",
srcs = [
"dispatcher_test.go",
"dispatcher_testkit_test.go",
"encode_and_sort_operator_test.go",
"planner_test.go",
"subtask_executor_test.go",
Expand All @@ -76,11 +78,12 @@ go_test(
embed = [":importinto"],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 7,
deps = [
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//disttask/framework/dispatcher",
"//disttask/framework/mock/execute",
"//disttask/framework/planner",
"//disttask/framework/proto",
Expand All @@ -94,6 +97,7 @@ go_test(
"//parser/mysql",
"//testkit",
"//util/logutil",
"//util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
Loading

0 comments on commit ae442ad

Please sign in to comment.