From d82fff82da617ed181afc15fb517178680d5d691 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 4 Aug 2022 11:23:04 +0800 Subject: [PATCH] retry --- br/pkg/lightning/backend/local/local.go | 18 +++++++----- br/pkg/lightning/backend/local/local_test.go | 31 +++++++++++++++----- br/pkg/lightning/common/errors.go | 18 +++++++----- br/pkg/lightning/common/retry.go | 4 ++- br/pkg/lightning/common/retry_test.go | 6 ++++ errors.toml | 10 +++++++ 6 files changed, 64 insertions(+), 23 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index c87f92f615c92..7fd3fd99b17a5 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1278,7 +1278,7 @@ WriteAndIngest: err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys) local.ingestConcurrency.Recycle(w) if err != nil { - if !common.IsRetryableError(err) { + if !local.isRetryableImportTiKVError(err) { return err } _, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{}) @@ -1308,7 +1308,7 @@ const ( retryIngest ) -func (local *local) isRetryableTiKVWriteError(err error) bool { +func (local *local) isRetryableImportTiKVError(err error) bool { err = errors.Cause(err) // io.EOF is not retryable in normal case // but on TiKV restart, if we're writing to TiKV(through GRPC) @@ -1338,7 +1338,7 @@ loopWrite: var rangeStats rangeStats metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys) if err != nil { - if !local.isRetryableTiKVWriteError(err) { + if !local.isRetryableImportTiKVError(err) { return err } @@ -1481,7 +1481,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine, if err == nil || common.IsContextCanceledError(err) { return } - if !common.IsRetryableError(err) { + if !local.isRetryableImportTiKVError(err) { break } log.FromContext(ctx).Warn("write and ingest by range failed", @@ -1930,12 +1930,11 @@ func (local *local) isIngestRetryable( } return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage()) case strings.Contains(errPb.Message, "raft: proposal dropped"): - // TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader' newRegion, err = getRegion() if err != nil { return retryNone, nil, errors.Trace(err) } - return retryWrite, newRegion, errors.New(errPb.GetMessage()) + return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage()) case errPb.ServerIsBusy != nil: return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage()) case errPb.RegionNotFound != nil: @@ -1952,8 +1951,13 @@ func (local *local) isIngestRetryable( return retryNone, nil, errors.Trace(err) } return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage()) + case errPb.DiskFull != nil: + return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage()) } - return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage()) + // all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange + // here we use a single named-error ErrKVIngestFailed to represent them all + // we can separate them later if it's needed + return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage()) } // return the smallest []byte that is bigger than current bytes. diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 041be6d86b49e..6b6b78702c68c 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" sst "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -509,11 +510,6 @@ func TestIsIngestRetryable(t *testing.T) { require.Equal(t, retryWrite, retryType) require.Error(t, err) - resp.Error = &errorpb.Error{Message: "unknown error"} - retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas) - require.Equal(t, retryNone, retryType) - require.EqualError(t, err, "non-retryable error: unknown error") - resp.Error = &errorpb.Error{ ReadIndexNotReady: &errorpb.ReadIndexNotReady{ Reason: "test", @@ -522,6 +518,27 @@ func TestIsIngestRetryable(t *testing.T) { retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas) require.Equal(t, retryWrite, retryType) require.Error(t, err) + + resp.Error = &errorpb.Error{ + Message: "raft: proposal dropped", + } + retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas) + require.Equal(t, retryWrite, retryType) + require.True(t, berrors.Is(err, common.ErrKVRaftProposalDropped)) + + resp.Error = &errorpb.Error{ + DiskFull: &errorpb.DiskFull{}, + } + retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas) + require.Equal(t, retryNone, retryType) + require.Contains(t, err.Error(), "non-retryable error") + + resp.Error = &errorpb.Error{ + StaleCommand: &errorpb.StaleCommand{}, + } + retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas) + require.Equal(t, retryNone, retryType) + require.True(t, berrors.Is(err, common.ErrKVIngestFailed)) } type testIngester struct{} @@ -1253,6 +1270,6 @@ func TestGetRegionSplitSizeKeys(t *testing.T) { func TestLocalIsRetryableTiKVWriteError(t *testing.T) { l := local{} - require.True(t, l.isRetryableTiKVWriteError(io.EOF)) - require.True(t, l.isRetryableTiKVWriteError(errors.Trace(io.EOF))) + require.True(t, l.isRetryableImportTiKVError(io.EOF)) + require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } diff --git a/br/pkg/lightning/common/errors.go b/br/pkg/lightning/common/errors.go index 78cc7a4fba844..ac9621d679c44 100644 --- a/br/pkg/lightning/common/errors.go +++ b/br/pkg/lightning/common/errors.go @@ -70,14 +70,16 @@ var ( ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient")) ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC")) - ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion")) - ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient")) - ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest")) - ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch")) - ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader")) - ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy")) - ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound")) - ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady")) + ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion")) + ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient")) + ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest")) + ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch")) + ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader")) + ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy")) + ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound")) + ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady")) + ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed")) + ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped")) ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend")) ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile")) diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index 7cd71dd1f1625..a3dbd2dd539e2 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -103,7 +103,9 @@ func isSingleRetryableError(err error) bool { case *errors.Error: switch { case berrors.Is(nerr, ErrKVEpochNotMatch), berrors.Is(nerr, ErrKVNotLeader), - berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy): + berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy), + berrors.Is(nerr, ErrKVReadIndexNotReady), berrors.Is(nerr, ErrKVIngestFailed), + berrors.Is(nerr, ErrKVRaftProposalDropped): // common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy // it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code, // so we have to transform it into a defined code diff --git a/br/pkg/lightning/common/retry_test.go b/br/pkg/lightning/common/retry_test.go index 004cce85622de..78719f28c53a5 100644 --- a/br/pkg/lightning/common/retry_test.go +++ b/br/pkg/lightning/common/retry_test.go @@ -43,10 +43,16 @@ func TestIsRetryableError(t *testing.T) { require.True(t, IsRetryableError(ErrKVEpochNotMatch)) require.True(t, IsRetryableError(ErrKVServerIsBusy)) require.True(t, IsRetryableError(ErrKVRegionNotFound)) + require.True(t, IsRetryableError(ErrKVReadIndexNotReady)) + require.True(t, IsRetryableError(ErrKVIngestFailed)) + require.True(t, IsRetryableError(ErrKVRaftProposalDropped)) require.True(t, IsRetryableError(ErrKVNotLeader.GenWithStack("test"))) require.True(t, IsRetryableError(ErrKVEpochNotMatch.GenWithStack("test"))) require.True(t, IsRetryableError(ErrKVServerIsBusy.GenWithStack("test"))) require.True(t, IsRetryableError(ErrKVRegionNotFound.GenWithStack("test"))) + require.True(t, IsRetryableError(ErrKVReadIndexNotReady.GenWithStack("test"))) + require.True(t, IsRetryableError(ErrKVIngestFailed.GenWithStack("test"))) + require.True(t, IsRetryableError(ErrKVRaftProposalDropped.GenWithStack("test"))) // net: connection refused _, err := net.Dial("tcp", "localhost:65533") diff --git a/errors.toml b/errors.toml index 2671b75c2d384..381f1d2163494 100755 --- a/errors.toml +++ b/errors.toml @@ -381,6 +381,16 @@ error = ''' create kv client error ''' +["Lightning:KV:ErrKVIngestFailed"] +error = ''' +ingest tikv failed +''' + +["Lightning:KV:ErrKVRaftProposalDropped"] +error = ''' +raft proposal dropped +''' + ["Lightning:KV:NotLeader"] error = ''' not leader