Skip to content

Commit

Permalink
lightning: adapt new behaviour that "write" may return epoch error (#… (
Browse files Browse the repository at this point in the history
#55205)

close #47694
  • Loading branch information
lance6716 authored Aug 6, 2024
1 parent 05a1ad3 commit cc91206
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 8 deletions.
48 changes: 40 additions & 8 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,23 +1003,41 @@ func (local *local) WriteToTiKV(
End: lastKey,
},
}
failpoint.Inject("changeEpochVersion", func(val failpoint.Value) {
cloned := *meta.RegionEpoch
meta.RegionEpoch = &cloned
i := val.(int)
if i >= 0 {
meta.RegionEpoch.Version += uint64(i)
} else {
meta.RegionEpoch.ConfVer -= uint64(-i)
}
})

annotateErr := func(in error, peer *metapb.Peer) error {
annotateErr := func(in error, peer *metapb.Peer, msg string) error {
// annotate the error with peer/store/region info to help debug.
return errors.Annotatef(in, "peer %d, store %d, region %d, epoch %s", peer.Id, peer.StoreId, region.Region.Id, region.Region.RegionEpoch.String())
return errors.Annotatef(
in,
"peer %d, store %d, region %d, epoch %s, %s",
peer.Id,
peer.StoreId,
region.Region.Id,
region.Region.RegionEpoch.String(),
msg,
)
}
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
allPeers := make([]*metapb.Peer, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return nil, Range{}, stats, annotateErr(err, peer)
return nil, Range{}, stats, annotateErr(err, peer, "when create client")
}

wstream, err := cli.Write(ctx)
if err != nil {
return nil, Range{}, stats, annotateErr(err, peer)
return nil, Range{}, stats, annotateErr(err, peer, "when open write stream")
}

// Bind uuid for this write request
Expand All @@ -1029,7 +1047,7 @@ func (local *local) WriteToTiKV(
},
}
if err = wstream.Send(req); err != nil {
return nil, Range{}, stats, annotateErr(err, peer)
return nil, Range{}, stats, annotateErr(err, peer, "when send meta")
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
Expand Down Expand Up @@ -1064,7 +1082,12 @@ func (local *local) WriteToTiKV(
}
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return annotateErr(err, allPeers[i])
if err == io.EOF {
// if it's EOF, need RecvMsg to get the error
dummy := &sst.WriteResponse{}
err = clients[i].RecvMsg(dummy)
}
return annotateErr(err, allPeers[i], "when send data")
}
}
return nil
Expand Down Expand Up @@ -1118,10 +1141,10 @@ func (local *local) WriteToTiKV(
for i, wStream := range clients {
resp, closeErr := wStream.CloseAndRecv()
if closeErr != nil {
return nil, Range{}, stats, annotateErr(closeErr, allPeers[i])
return nil, Range{}, stats, annotateErr(closeErr, allPeers[i], "when close write stream")
}
if resp.Error != nil {
return nil, Range{}, stats, annotateErr(errors.New(resp.Error.Message), allPeers[i])
return nil, Range{}, stats, annotateErr(errors.New(resp.Error.Message), allPeers[i], "when close write stream")
}
if leaderID == region.Region.Peers[i].GetId() {
leaderPeerMetas = resp.Metas
Expand Down Expand Up @@ -1468,6 +1491,15 @@ loopWrite:
return err
}

// check the new error message
errStr := err.Error()
if strings.Contains(errStr, "RequestTooNew") {
// we will retry from write, which is simply continue the loop
} else if strings.Contains(errStr, "RequestTooOld") {
// we will retry from scan region, so return and let caller scan it again
return err
}

log.FromContext(ctx).Warn("write to tikv failed", log.ShortError(err), zap.Int("retry", i))
continue loopWrite
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func isSingleRetryableError(err error) bool {
// 2. in write TiKV: rpc error: code = Unknown desc = EngineTraits(Engine(Status { code: IoError, sub_code:
// None, sev: NoError, state: \"IO error: No such file or directory: while stat a file for size:
// /...../63992d9c-fbc8-4708-b963-32495b299027_32279707_325_5280_write.sst: No such file or directory\"
// 3. in write TiKV: rpc error: code = Unknown desc = Engine("request region 26 is staler than local region,
// local epoch conf_ver: 5 version: 65, request epoch conf_ver: 5 version: 64, please rescan region later")
return true
default:
return false
Expand Down
29 changes: 29 additions & 0 deletions br/tests/lightning_max_random/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ cleanup() {

cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(-1)"

# auto_random_max = 2^{64-1-10}-1
# db.test contains key auto_random_max - 1
# db.test1 contains key auto_random_max
Expand All @@ -62,4 +64,31 @@ check_contains 'ERROR'
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
run_sql 'INSERT INTO db.test2(b) VALUES(55);'

grep 'RequestTooOld' "$TEST_DIR/lightning.log"
cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(10)"

# auto_random_max = 2^{64-1-10}-1
# db.test contains key auto_random_max - 1
# db.test1 contains key auto_random_max
# db.test2 contains key auto_random_max + 1 (overflow)
run_lightning --sorted-kv-dir "$TEST_DIR/sst" --config "tests/$TEST_NAME/config.toml" --log-file "$TEST_DIR/lightning.log"
check_result
# successfully insert: d.test auto_random key has not reached maximum
run_sql 'INSERT INTO db.test(b) VALUES(11);'
# fail for further insertion
run_sql 'INSERT INTO db.test(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
check_contains 'ERROR'
# fail: db.test1 has key auto_random_max
run_sql 'INSERT INTO db.test1(b) VALUES(11);'
run_sql 'INSERT INTO db.test1(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
check_contains 'ERROR'
# successfully insert for overflow key
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
run_sql 'INSERT INTO db.test2(b) VALUES(55);'

grep 'RequestTooNew' "$TEST_DIR/lightning.log"
cleanup

0 comments on commit cc91206

Please sign in to comment.