diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 9524ab5febc2b..c17ad0b92ec1c 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -142,5 +142,6 @@ go_test( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index b08474841a9e2..c21d363c90fec 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1531,19 +1531,18 @@ loopWrite: continue } } + if err != nil { + log.FromContext(ctx).Warn("batch ingest fail after retry, will retry import full range", log.ShortError(err), + logutil.Region(region.Region), zap.Reflect("meta", ingestMetas)) + return errors.Trace(err) + } } - if err != nil { - log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err), - logutil.Region(region.Region), logutil.Key("start", start), - logutil.Key("end", end)) - } else { - engine.importedKVSize.Add(rangeStats.totalBytes) - engine.importedKVCount.Add(rangeStats.count) - engine.finishedRanges.add(finishedRange) - if local.metrics != nil { - local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes)) - } + engine.importedKVSize.Add(rangeStats.totalBytes) + engine.importedKVCount.Add(rangeStats.count) + engine.finishedRanges.add(finishedRange) + if local.metrics != nil { + local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes)) } return errors.Trace(err) } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index f52227ebda8a5..b65ec0169dcd6 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -872,6 +873,20 @@ func newMockImportClient() *mockImportClient { } } +func (c *mockImportClient) Ingest(context.Context, *sst.IngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) { + defer func() { + c.cnt++ + }() + if c.apiInvokeRecorder != nil { + c.apiInvokeRecorder["Ingest"] = append(c.apiInvokeRecorder["Ingest"], c.store.GetId()) + } + if c.cnt < c.retry && (c.err != nil || c.resp != nil) { + return c.resp, c.err + } + + return &sst.IngestResponse{}, nil +} + func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) { defer func() { c.cnt++ @@ -1422,3 +1437,67 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { require.Equal(t, []uint64{1, 2, 3, 11, 12, 13}, apiInvokeRecorder["Write"]) require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"]) } + +func TestIngestFailOnFirstBatch(t *testing.T) { + logger := log.Logger{Logger: zap.NewExample()} + ctx := log.NewContext(context.Background(), logger) + + apiInvokeRecorder := map[string][]uint64{} + importCli := newMockImportClient() + + local := &local{ + importClientFactory: &mockImportClientFactory{ + stores: []*metapb.Store{ + {Id: 1}, {Id: 2}, {Id: 3}, + {Id: 11}, {Id: 12}, {Id: 13}, + }, + createClientFn: func(store *metapb.Store) sst.ImportSSTClient { + importCli.store = store + importCli.apiInvokeRecorder = apiInvokeRecorder + if store.Id == 1 { + importCli.retry = 5 + importCli.err = status.Error(codes.Unknown, "Suspended { time_to_lease_expire: 1.204s }") + } + return importCli + }, + }, + logger: logger, + ingestConcurrency: worker.NewPool(ctx, 1, "ingest"), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: false, + shouldCheckWriteStall: false, + } + + db, tmpPath := makePebbleDB(t, nil) + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel := context.WithCancel(context.Background()) + f := &Engine{ + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + f.db.Store(db) + err := db.Set([]byte("a"), []byte("a"), nil) + require.NoError(t, err) + require.False(t, local.supportMultiIngest) + err = local.writeAndIngestPairs(ctx, f, &split.RegionInfo{ + Region: &metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 1, StoreId: 1}, + }, + }, + Leader: &metapb.Peer{ + Id: 1, + StoreId: 1, + }, + }, []byte{}, []byte("b"), 0, 0) + require.ErrorContains(t, err, "Suspended { time_to_lease_expire: 1.204s }") + require.Equal(t, []uint64{1}, apiInvokeRecorder["Write"]) + require.Equal(t, []uint64{1, 1, 1, 1, 1}, apiInvokeRecorder["Ingest"]) +}