Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: move region_* into package store/tikv/region #25205

Merged
merged 13 commits into from
Jun 10, 2021
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7205,12 +7205,12 @@ func (s *testSerialSuite1) TestCollectCopRuntimeStats(c *C) {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int)")
tk.MustExec("set tidb_enable_collect_execution_info=1;")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/region/tikvStoreRespResult", `return(true)`), IsNil)
rows := tk.MustQuery("explain analyze select * from t1").Rows()
c.Assert(len(rows), Equals, 2)
explain := fmt.Sprintf("%v", rows[0])
c.Assert(explain, Matches, ".*rpc_num: 2, .*regionMiss:.*")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/region/tikvStoreRespResult"), IsNil)
}

func (s *testSerialSuite1) TestIndexLookupRuntimeStats(c *C) {
Expand Down
9 changes: 5 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/unionstore"
Expand Down Expand Up @@ -589,7 +590,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
// Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest.
if !act.retry {
for _, group := range groups {
c.regionTxnSize[group.region.id] = group.mutations.Len()
c.regionTxnSize[group.region.GetID()] = group.mutations.Len()
}
}
sizeFunc = c.keyValueSize
Expand Down Expand Up @@ -859,7 +860,7 @@ func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, tt
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
if regionErr.GetEpochNotMatch() == nil || region.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, false, errors.Trace(err)
Expand Down Expand Up @@ -1539,7 +1540,7 @@ func (c *twoPhaseCommitter) shouldWriteBinlog() bool {
const txnCommitBatchSize = 16 * 1024

type batchMutations struct {
region RegionVerID
region region.VerID
mutations CommitterMutations
isPrimary bool
}
Expand Down Expand Up @@ -1572,7 +1573,7 @@ func newBatched(primaryKey []byte) *batched {

// appendBatchMutationsBySize appends mutations to b. It may split the keys to make
// sure each batch's size does not exceed the limit.
func (b *batched) appendBatchMutationsBySize(region RegionVerID, mutations CommitterMutations, sizeFn func(k, v []byte) int, limit int) {
func (b *batched) appendBatchMutationsBySize(region region.VerID, mutations CommitterMutations, sizeFn func(k, v []byte) int, limit int) {
failpoint.Inject("twoPCRequestBatchSizeLimit", func() {
limit = 1
})
Expand Down
11 changes: 6 additions & 5 deletions store/tikv/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
)
Expand All @@ -30,11 +31,11 @@ import (
// meet the secondary lock again and run into a deadloop.
type ClientHelper struct {
lockResolver *LockResolver
regionCache *RegionCache
regionCache *region.Cache
resolvedLocks *util.TSSet
client Client
resolveLite bool
RegionRequestRuntimeStats
region.RequestRuntimeStats
}

// NewClientHelper creates a helper instance.
Expand All @@ -54,7 +55,7 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
var msBeforeTxnExpired int64
if ch.Stats != nil {
defer func(start time.Time) {
RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start))
region.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start))
}(time.Now())
}
if ch.resolveLite {
Expand All @@ -73,8 +74,8 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...StoreSelectorOption) (*tikvrpc.Response, *RPCContext, string, error) {
sender := NewRegionRequestSender(ch.regionCache, ch.client)
func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID region.VerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...region.StoreSelectorOption) (*tikvrpc.Response, *region.RPCContext, string, error) {
sender := region.NewRegionRequestSender(ch.regionCache, ch.client)
if len(directStoreAddr) > 0 {
sender.SetStoreAddr(directStoreAddr)
}
Expand Down
7 changes: 4 additions & 3 deletions store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -67,8 +68,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
if batch.isPrimary && sender.rpcError != nil && !c.isAsyncCommit() {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
if batch.isPrimary && sender.GetRPCError() != nil && !c.isAsyncCommit() {
c.setUndeterminedErr(errors.Trace(sender.GetRPCError()))
}

// Unexpected error occurs, return it.
Expand All @@ -84,7 +85,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
if regionErr.GetEpochNotMatch() == nil || region.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

// Storage represent the kv.Storage runs on TiKV.
type Storage interface {
// GetRegionCache gets the RegionCache.
GetRegionCache() *RegionCache
GetRegionCache() *region.Cache

// SendReq sends a request to TiKV.
SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error)
SendReq(bo *Backoffer, req *tikvrpc.Request, regionID region.VerID, timeout time.Duration) (*tikvrpc.Response, error)

// GetLockResolver gets the LockResolver.
GetLockResolver() *LockResolver
Expand Down
21 changes: 11 additions & 10 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -76,7 +77,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
regionCache *RegionCache
regionCache *region.Cache
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler

Expand Down Expand Up @@ -139,7 +140,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
uuid: uuid,
oracle: o,
pdClient: pdClient,
regionCache: NewRegionCache(pdClient),
regionCache: region.NewRegionCache(pdClient),
kv: spkv,
safePoint: 0,
spTime: time.Now(),
Expand Down Expand Up @@ -289,13 +290,13 @@ func (s *KVStore) SupportDeleteRange() (supported bool) {
}

// SendReq sends a request to region.
func (s *KVStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
func (s *KVStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID region.VerID, timeout time.Duration) (*tikvrpc.Response, error) {
sender := region.NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
return sender.SendReq(bo, req, regionID, timeout)
}

// GetRegionCache returns the region cache instance.
func (s *KVStore) GetRegionCache() *RegionCache {
func (s *KVStore) GetRegionCache() *region.Cache {
return s.regionCache
}

Expand Down Expand Up @@ -335,7 +336,7 @@ func (s *KVStore) GetTiKVClient() (client Client) {

// GetMinSafeTS return the minimal safeTS of the storage with given txnScope.
func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
stores := make([]*Store, 0)
stores := make([]*region.Store, 0)
allStores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
if txnScope != oracle.GlobalTxnScope {
for _, store := range allStores {
Expand Down Expand Up @@ -367,7 +368,7 @@ func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
s.safeTSMap.Store(storeID, safeTS)
}

func (s *KVStore) getMinSafeTSByStores(stores []*Store) uint64 {
func (s *KVStore) getMinSafeTSByStores(stores []*region.Store) uint64 {
failpoint.Inject("injectSafeTS", func(val failpoint.Value) {
injectTS := val.(int)
failpoint.Return(uint64(injectTS))
Expand All @@ -378,7 +379,7 @@ func (s *KVStore) getMinSafeTSByStores(stores []*Store) uint64 {
return 0
}
for _, store := range stores {
safeTS := s.getSafeTS(store.storeID)
safeTS := s.getSafeTS(store.StoreID())
if safeTS < minSafeTS {
minSafeTS = safeTS
}
Expand Down Expand Up @@ -407,8 +408,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(len(stores))
for _, store := range stores {
storeID := store.storeID
storeAddr := store.addr
storeID := store.StoreID()
storeAddr := store.GetAddr()
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{
Expand Down
17 changes: 9 additions & 8 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl
return nil, errors.Trace(err)
}

s, err := NewKVStore(uuid, &CodecPDClient{pdCli}, spkv, client.NewRPCClient(security))
s, err := NewKVStore(uuid, region.NewCodeCPDClient(pdCli), spkv, client.NewRPCClient(security))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {

// BatchResolveLocks resolve locks in a batch.
// Used it in gcworker only!
func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) {
func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc region.VerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}
Expand Down Expand Up @@ -351,7 +352,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
var pushFail bool
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
cleanTxns := make(map[uint64]map[region.VerID]struct{})
var pushed []uint64
// pushed is only used in the read operation.
if !forWrite {
Expand All @@ -370,7 +371,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanRegions = make(map[region.VerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

Expand Down Expand Up @@ -716,7 +717,7 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s
return nil
}

func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys [][]byte, curRegionID RegionVerID, shared *asyncResolveData) error {
func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys [][]byte, curRegionID region.VerID, shared *asyncResolveData) error {
checkReq := &kvrpcpb.CheckSecondaryLocksRequest{
Keys: curKeys,
StartVersion: txnID,
Expand Down Expand Up @@ -843,7 +844,7 @@ func (lr *LockResolver) checkAllSecondaries(bo *Backoffer, l *Lock, status *TxnS
}

// resolveRegionLocks is essentially the same as resolveLock, but we resolve all keys in the same region at the same time.
func (lr *LockResolver) resolveRegionLocks(bo *Backoffer, l *Lock, region RegionVerID, keys [][]byte, status TxnStatus) error {
func (lr *LockResolver) resolveRegionLocks(bo *Backoffer, l *Lock, region region.VerID, keys [][]byte, status TxnStatus) error {
lreq := &kvrpcpb.ResolveLockRequest{
StartVersion: l.TxnID,
}
Expand Down Expand Up @@ -895,7 +896,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *Backoffer, l *Lock, region Region
return nil
}

func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[RegionVerID]struct{}) error {
func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[region.VerID]struct{}) error {
metrics.LockResolverCountWithResolveLocks.Inc()
resolveLite := lite || l.TxnSize < bigTxnThreshold
for {
Expand Down Expand Up @@ -953,7 +954,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, li
}
}

func (lr *LockResolver) resolvePessimisticLock(bo *Backoffer, l *Lock, cleanRegions map[RegionVerID]struct{}) error {
func (lr *LockResolver) resolvePessimisticLock(bo *Backoffer, l *Lock, cleanRegions map[region.VerID]struct{}) error {
metrics.LockResolverCountWithResolveLocks.Inc()
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
if regionErr.GetEpochNotMatch() == nil || region.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
Expand Down
9 changes: 5 additions & 4 deletions store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/region"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -149,7 +150,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
}
}

txnSize := uint64(c.regionTxnSize[batch.region.id])
txnSize := uint64(c.regionTxnSize[batch.region.GetID()])
// When we retry because of a region miss, we don't know the transaction size. We set the transaction size here
// to MaxUint64 to avoid unexpected "resolve lock lite".
if action.retry {
Expand All @@ -167,8 +168,8 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
// transaction has been successfully committed.
// If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined
// errors.
if (c.isAsyncCommit() || c.isOnePC()) && sender.rpcError != nil && atomic.LoadUint32(&c.prewriteCancelled) == 0 {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
if (c.isAsyncCommit() || c.isOnePC()) && sender.GetRPCError() != nil && atomic.LoadUint32(&c.prewriteCancelled) == 0 {
c.setUndeterminedErr(errors.Trace(sender.GetRPCError()))
}
}
}()
Expand All @@ -193,7 +194,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
if regionErr.GetEpochNotMatch() == nil || region.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
Expand Down
Loading