Skip to content

Commit

Permalink
local backend: fix worker err overriden by job generation err (#48185) (
Browse files Browse the repository at this point in the history
#48198)

close #47992
  • Loading branch information
ti-chi-bot authored Nov 2, 2023
1 parent a2e8ab3 commit 9a8182f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,9 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by
LowerBound: lowerBound,
UpperBound: upperBound,
}
failpoint.Inject("mockGetFirstAndLastKey", func() {
failpoint.Return(lowerBound, upperBound, nil)
})

iter := e.newKVIter(context.Background(), opt)
//nolint: errcheck
Expand Down
47 changes: 27 additions & 20 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,12 @@ func (local *Backend) generateAndSendJob(
}

failpoint.Inject("beforeGenerateJob", nil)
failpoint.Inject("sendDummyJob", func(_ failpoint.Value) {
// this is used to trigger worker failure, used together
// with WriteToTiKVNotEnoughDiskSpace
jobToWorkerCh <- &regionJob{}
time.Sleep(5 * time.Second)
})
jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys)
if err != nil {
if common.IsContextCanceledError(err) {
Expand Down Expand Up @@ -1548,29 +1554,30 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
})
}

err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
firstErr.Set(err)
workGroup.Go(func() error {
err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
return err
}

jobWg.Wait()
workerCancel()
err2 := workGroup.Wait()
if !common.IsContextCanceledError(err2) {
log.FromContext(ctx).Error("worker meets error", zap.Error(err2))
return nil
})
if err := workGroup.Wait(); err != nil {
if !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}
return firstErr.Get()
firstErr.Set(err)
}

jobWg.Wait()
workerCancel()
firstErr.Set(workGroup.Wait())
firstErr.Set(ctx.Err())
return firstErr.Get()
}

Expand Down
37 changes: 37 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,3 +2085,40 @@ func TestCtxCancelIsIgnored(t *testing.T) {
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

func TestWorkerFailedWhenGeneratingJobs(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
})

initRanges := []Range{
{start: []byte{'c'}, end: []byte{'d'}},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 1,
},
splitCli: initTestSplitClient(
[][]byte{{1}, {11}},
panicSplitRegionClient{},
),
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd
}

func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

if c.hook != nil {
key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit)
}
Expand Down

0 comments on commit 9a8182f

Please sign in to comment.