Skip to content

Commit

Permalink
lightning: retry on error on tikv ingest like 'stale command' (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Aug 5, 2022
1 parent a9e2b76 commit a6b5449
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 23 deletions.
18 changes: 11 additions & 7 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ WriteAndIngest:
err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}
_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -1134,7 +1134,7 @@ const (
retryIngest
)

func (local *local) isRetryableTiKVWriteError(err error) bool {
func (local *local) isRetryableImportTiKVError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
Expand Down Expand Up @@ -1164,7 +1164,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !local.isRetryableTiKVWriteError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}

Expand Down Expand Up @@ -1305,7 +1305,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
if err == nil || common.IsContextCanceledError(err) {
return
}
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
break
}
log.L().Warn("write and ingest by range failed",
Expand Down Expand Up @@ -1812,12 +1812,11 @@ func (local *local) isIngestRetryable(
}
return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())
case strings.Contains(errPb.Message, "raft: proposal dropped"):
// TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader'
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, errors.New(errPb.GetMessage())
return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
case errPb.ServerIsBusy != nil:
return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
case errPb.RegionNotFound != nil:
Expand All @@ -1834,8 +1833,13 @@ func (local *local) isIngestRetryable(
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage())
case errPb.DiskFull != nil:
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
}
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
// all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange
// here we use a single named-error ErrKVIngestFailed to represent them all
// we can separate them later if it's needed
return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
Expand Down
31 changes: 24 additions & 7 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -506,11 +507,6 @@ func TestIsIngestRetryable(t *testing.T) {
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{Message: "unknown error"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.EqualError(t, err, "non-retryable error: unknown error")

resp.Error = &errorpb.Error{
ReadIndexNotReady: &errorpb.ReadIndexNotReady{
Reason: "test",
Expand All @@ -519,6 +515,27 @@ func TestIsIngestRetryable(t *testing.T) {
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{
Message: "raft: proposal dropped",
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.True(t, berrors.Is(err, common.ErrKVRaftProposalDropped))

resp.Error = &errorpb.Error{
DiskFull: &errorpb.DiskFull{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.Contains(t, err.Error(), "non-retryable error")

resp.Error = &errorpb.Error{
StaleCommand: &errorpb.StaleCommand{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.True(t, berrors.Is(err, common.ErrKVIngestFailed))
}

type testIngester struct{}
Expand Down Expand Up @@ -1248,6 +1265,6 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {

func TestLocalIsRetryableTiKVWriteError(t *testing.T) {
l := local{}
require.True(t, l.isRetryableTiKVWriteError(io.EOF))
require.True(t, l.isRetryableTiKVWriteError(errors.Trace(io.EOF)))
require.True(t, l.isRetryableImportTiKVError(io.EOF))
require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF)))
}
18 changes: 10 additions & 8 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ var (
ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func isSingleRetryableError(err error) bool {
case *errors.Error:
switch {
case berrors.Is(nerr, ErrKVEpochNotMatch), berrors.Is(nerr, ErrKVNotLeader),
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy):
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy),
berrors.Is(nerr, ErrKVReadIndexNotReady), berrors.Is(nerr, ErrKVIngestFailed),
berrors.Is(nerr, ErrKVRaftProposalDropped):
// common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy
// it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code,
// so we have to transform it into a defined code
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ func TestIsRetryableError(t *testing.T) {
require.True(t, IsRetryableError(ErrKVEpochNotMatch))
require.True(t, IsRetryableError(ErrKVServerIsBusy))
require.True(t, IsRetryableError(ErrKVRegionNotFound))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady))
require.True(t, IsRetryableError(ErrKVIngestFailed))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped))
require.True(t, IsRetryableError(ErrKVNotLeader.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVEpochNotMatch.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVServerIsBusy.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRegionNotFound.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVIngestFailed.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped.GenWithStack("test")))

// net: connection refused
_, err := net.Dial("tcp", "localhost:65533")
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,16 @@ error = '''
create kv client error
'''

["Lightning:KV:ErrKVIngestFailed"]
error = '''
ingest tikv failed
'''

["Lightning:KV:ErrKVRaftProposalDropped"]
error = '''
raft proposal dropped
'''

["Lightning:KV:NotLeader"]
error = '''
not leader
Expand Down

0 comments on commit a6b5449

Please sign in to comment.