Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: retry on error on tikv ingest like 'stale command' #36878

Merged
merged 2 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ WriteAndIngest:
err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}
_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -1308,7 +1308,7 @@ const (
retryIngest
)

func (local *local) isRetryableTiKVWriteError(err error) bool {
func (local *local) isRetryableImportTiKVError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
Expand Down Expand Up @@ -1338,7 +1338,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !local.isRetryableTiKVWriteError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}

Expand Down Expand Up @@ -1481,7 +1481,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
if err == nil || common.IsContextCanceledError(err) {
return
}
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
break
}
log.FromContext(ctx).Warn("write and ingest by range failed",
Expand Down Expand Up @@ -1930,12 +1930,11 @@ func (local *local) isIngestRetryable(
}
return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())
case strings.Contains(errPb.Message, "raft: proposal dropped"):
// TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader'
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, errors.New(errPb.GetMessage())
return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
case errPb.ServerIsBusy != nil:
return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
case errPb.RegionNotFound != nil:
Expand All @@ -1952,8 +1951,13 @@ func (local *local) isIngestRetryable(
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage())
case errPb.DiskFull != nil:
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
}
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
// all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange
// here we use a single named-error ErrKVIngestFailed to represent them all
// we can separate them later if it's needed
return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
Expand Down
31 changes: 24 additions & 7 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -509,11 +510,6 @@ func TestIsIngestRetryable(t *testing.T) {
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{Message: "unknown error"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.EqualError(t, err, "non-retryable error: unknown error")

resp.Error = &errorpb.Error{
ReadIndexNotReady: &errorpb.ReadIndexNotReady{
Reason: "test",
Expand All @@ -522,6 +518,27 @@ func TestIsIngestRetryable(t *testing.T) {
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{
Message: "raft: proposal dropped",
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.True(t, berrors.Is(err, common.ErrKVRaftProposalDropped))

resp.Error = &errorpb.Error{
DiskFull: &errorpb.DiskFull{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.Contains(t, err.Error(), "non-retryable error")

resp.Error = &errorpb.Error{
StaleCommand: &errorpb.StaleCommand{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.True(t, berrors.Is(err, common.ErrKVIngestFailed))
}

type testIngester struct{}
Expand Down Expand Up @@ -1253,6 +1270,6 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {

func TestLocalIsRetryableTiKVWriteError(t *testing.T) {
l := local{}
require.True(t, l.isRetryableTiKVWriteError(io.EOF))
require.True(t, l.isRetryableTiKVWriteError(errors.Trace(io.EOF)))
require.True(t, l.isRetryableImportTiKVError(io.EOF))
require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF)))
}
18 changes: 10 additions & 8 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ var (
ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func isSingleRetryableError(err error) bool {
case *errors.Error:
switch {
case berrors.Is(nerr, ErrKVEpochNotMatch), berrors.Is(nerr, ErrKVNotLeader),
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy):
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy),
berrors.Is(nerr, ErrKVReadIndexNotReady), berrors.Is(nerr, ErrKVIngestFailed),
berrors.Is(nerr, ErrKVRaftProposalDropped):
// common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy
// it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code,
// so we have to transform it into a defined code
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ func TestIsRetryableError(t *testing.T) {
require.True(t, IsRetryableError(ErrKVEpochNotMatch))
require.True(t, IsRetryableError(ErrKVServerIsBusy))
require.True(t, IsRetryableError(ErrKVRegionNotFound))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady))
require.True(t, IsRetryableError(ErrKVIngestFailed))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped))
require.True(t, IsRetryableError(ErrKVNotLeader.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVEpochNotMatch.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVServerIsBusy.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRegionNotFound.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVIngestFailed.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped.GenWithStack("test")))

// net: connection refused
_, err := net.Dial("tcp", "localhost:65533")
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,16 @@ error = '''
create kv client error
'''

["Lightning:KV:ErrKVIngestFailed"]
error = '''
ingest tikv failed
'''

["Lightning:KV:ErrKVRaftProposalDropped"]
error = '''
raft proposal dropped
'''

["Lightning:KV:NotLeader"]
error = '''
not leader
Expand Down