From 82ca7993dcac9e9b1b3c803c39540b10ca46b73f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 16 Jun 2023 16:43:14 +0800 Subject: [PATCH 1/2] lightning: fix "context cancel" overwrites the real error Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local.go | 11 ++-- br/pkg/lightning/backend/local/local_test.go | 57 ++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 1362f3543b972..e4d1d67083cc7 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1145,14 +1145,16 @@ func (local *Backend) generateAndSendJob( for _, jobRange := range jobRanges { r := jobRange eg.Go(func() error { - select { - case <-egCtx.Done(): + if egCtx.Err() != nil { return nil - default: } + failpoint.Inject("beforeGenerateJob", nil) jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys) if err != nil { + if common.IsContextCanceledError(err) { + return nil + } return err } for _, job := range jobs { @@ -1189,6 +1191,9 @@ func (local *Backend) generateJobForRange( regionSplitSize, regionSplitKeys int64, ) ([]*regionJob, error) { failpoint.Inject("fakeRegionJobs", func() { + if ctx.Err() != nil { + failpoint.Return(nil, ctx.Err()) + } key := [2]string{string(keyRange.start), string(keyRange.end)} injected := fakeRegionJobs[key] // overwrite the stage to regionScanned, because some time same keyRange diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index ddd1c17ff2d34..8d427e6b6e398 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2028,3 +2028,60 @@ func TestRegionJobResetRetryCounter(t *testing.T) { } } } + +func TestCtxCancelIsIgnored(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob", "sleep(1000)") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []Range{ + {start: []byte{'c'}, end: []byte{'d'}}, + {start: []byte{'d'}, end: []byte{'e'}}, + } + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"d", "e"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'d'}, end: []byte{'e'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +} From 2d3ca6b1585ef9ca2de4770e9fddb3e290f39df4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 19 Jun 2023 17:09:16 +0800 Subject: [PATCH 2/2] address comment Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index e4d1d67083cc7..a36dd82d428ab 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1570,6 +1570,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges jobWg.Wait() workerCancel() firstErr.Set(workGroup.Wait()) + firstErr.Set(ctx.Err()) return firstErr.Get() }