Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup/: retry for read index not ready and proposal in merging mode (#…
Browse files Browse the repository at this point in the history
…615) (#626)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Nov 27, 2020
1 parent 1d3fb98 commit e35626c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
15 changes: 9 additions & 6 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,14 +687,15 @@ func (bc *Client) fineGrainedBackup(
}
}

func onBackupResponse(
// OnBackupResponse checks the backup resp, decides whether to retry and generate the error.
func OnBackupResponse(
storeID uint64,
bo *tikv.Backoffer,
backupTS uint64,
lockResolver *tikv.LockResolver,
resp *kvproto.BackupResponse,
) (*kvproto.BackupResponse, int, error) {
log.Debug("onBackupResponse", zap.Reflect("resp", resp))
log.Debug("OnBackupResponse", zap.Reflect("resp", resp))
if resp.Error == nil {
return resp, 0, nil
}
Expand All @@ -716,7 +717,7 @@ func onBackupResponse(
}
// Backup should not meet error other than KeyLocked.
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d onBackupResponse error %v", storeID, v)
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)

case *kvproto.Error_RegionError:
regionErr := v.RegionError
Expand All @@ -726,9 +727,11 @@ func onBackupResponse(
regionErr.RegionNotFound != nil ||
regionErr.ServerIsBusy != nil ||
regionErr.StaleCommand != nil ||
regionErr.StoreNotMatch != nil) {
regionErr.StoreNotMatch != nil ||
regionErr.ReadIndexNotReady != nil ||
regionErr.ProposalInMergingMode != nil) {
log.Error("unexpect region error", zap.Reflect("RegionError", regionErr))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d onBackupResponse error %v", storeID, v)
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)
}
log.Warn("backup occur region error",
zap.Reflect("RegionError", regionErr),
Expand Down Expand Up @@ -787,7 +790,7 @@ func (bc *Client) handleFineGrained(
// Handle responses with the same backoffer.
func(resp *kvproto.BackupResponse) error {
response, backoffMs, err1 :=
onBackupResponse(storeID, bo, backupTS, lockResolver, resp)
OnBackupResponse(storeID, bo, backupTS, lockResolver, resp)
if err1 != nil {
return err1
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"time"

. "github.com/pingcap/check"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -139,3 +142,41 @@ func (r *testBackup) TestBuildTableRange(c *C) {
{StartKey: tablecodec.EncodeRowKey(7, low), EndKey: tablecodec.EncodeRowKey(7, high)},
})
}

func (r *testBackup) TestOnBackupRegionErrorResponse(c *C) {
type Case struct {
storeID uint64
bo *tikv.Backoffer
backupTS uint64
lockResolver *tikv.LockResolver
resp *kvproto.BackupResponse
exceptedBackoffMs int
exceptedErr bool
}
newBackupRegionErrorResp := func(regionError *errorpb.Error) *kvproto.BackupResponse {
return &kvproto.BackupResponse{Error: &kvproto.Error{Detail: &kvproto.Error_RegionError{RegionError: regionError}}}
}

cases := []Case{
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false},
}
for _, cs := range cases {
c.Log(cs)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp)
c.Assert(backoffMs, Equals, cs.exceptedBackoffMs)
if cs.exceptedErr {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
}
}
}

0 comments on commit e35626c

Please sign in to comment.