Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: make RegionStore private #25231

Merged
merged 2 commits into from
Jun 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ type Region struct {
// AccessIndex represent the index for accessIndex array
type AccessIndex int

// RegionStore represents region stores info
// regionStore represents region stores info
// it will be store as unsafe.Pointer and be load at once
type RegionStore struct {
type regionStore struct {
workTiKVIdx AccessIndex // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer
proxyTiKVIdx AccessIndex // point to the tikv peer that can forward requests to the leader. -1 means not using proxy
workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer
Expand All @@ -113,12 +113,12 @@ type RegionStore struct {
accessIndex [NumAccessMode][]int // AccessMode => idx in stores
}

func (r *RegionStore) accessStore(mode AccessMode, idx AccessIndex) (int, *Store) {
func (r *regionStore) accessStore(mode AccessMode, idx AccessIndex) (int, *Store) {
sidx := r.accessIndex[mode][idx]
return sidx, r.stores[sidx]
}

func (r *RegionStore) getAccessIndex(mode AccessMode, store *Store) AccessIndex {
func (r *regionStore) getAccessIndex(mode AccessMode, store *Store) AccessIndex {
for index, sidx := range r.accessIndex[mode] {
if r.stores[sidx].storeID == store.storeID {
return AccessIndex(index)
Expand All @@ -127,14 +127,14 @@ func (r *RegionStore) getAccessIndex(mode AccessMode, store *Store) AccessIndex
return -1
}

func (r *RegionStore) accessStoreNum(mode AccessMode) int {
func (r *regionStore) accessStoreNum(mode AccessMode) int {
return len(r.accessIndex[mode])
}

// clone clones region store struct.
func (r *RegionStore) clone() *RegionStore {
func (r *regionStore) clone() *regionStore {
storeEpochs := make([]uint32, len(r.stores))
rs := &RegionStore{
rs := &regionStore{
workTiFlashIdx: r.workTiFlashIdx,
proxyTiKVIdx: r.proxyTiKVIdx,
workTiKVIdx: r.workTiKVIdx,
Expand All @@ -150,7 +150,7 @@ func (r *RegionStore) clone() *RegionStore {
}

// return next follower store's index
func (r *RegionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {
func (r *regionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {
l := uint32(r.accessStoreNum(TiKVOnly))
if l <= 1 {
return r.workTiKVIdx
Expand All @@ -171,7 +171,7 @@ func (r *RegionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {
}

// return next leader or follower store's index
func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
func (r *regionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
if op.leaderOnly {
return r.workTiKVIdx
}
Expand All @@ -191,7 +191,7 @@ func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
return candidates[seed%uint32(len(candidates))]
}

func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool {
func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool {
_, s := r.accessStore(TiKVOnly, aidx)
// filter label unmatched store
return s.IsLabelsMatch(op.labels)
Expand All @@ -201,7 +201,7 @@ func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp
func (r *Region) init(bo *Backoffer, c *RegionCache) error {
// region store pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &RegionStore{
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
workTiFlashIdx: 0,
Expand Down Expand Up @@ -248,12 +248,12 @@ func (r *Region) init(bo *Backoffer, c *RegionCache) error {
return nil
}

func (r *Region) getStore() (store *RegionStore) {
store = (*RegionStore)(atomic.LoadPointer(&r.store))
func (r *Region) getStore() (store *regionStore) {
store = (*regionStore)(atomic.LoadPointer(&r.store))
return
}

func (r *Region) compareAndSwapStore(oldStore, newStore *RegionStore) bool {
func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool {
return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore))
}

Expand Down Expand Up @@ -798,7 +798,7 @@ func (c *RegionCache) OnSendFailForTiFlash(bo *Backoffer, store *Store, region R
}
}

func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *RegionStore) int {
func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionStore) int {
incEpochStoreIdx := -1
// invalidate regions in store.
epoch := rs.storeEpochs[storeIdx]
Expand Down Expand Up @@ -1456,7 +1456,7 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store)
}
}

func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.needForwarding) == 0 {
return
}
Expand Down Expand Up @@ -1680,25 +1680,25 @@ func (r *Region) GetLeaderStoreID() uint64 {
return r.meta.Peers[storeIdx].StoreId
}

func (r *Region) getKvStorePeer(rs *RegionStore, aidx AccessIndex) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
func (r *Region) getKvStorePeer(rs *regionStore, aidx AccessIndex) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
storeIdx, store = rs.accessStore(TiKVOnly, aidx)
peer = r.meta.Peers[storeIdx]
accessIdx = aidx
return
}

// WorkStorePeer returns current work store with work peer.
func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
func (r *Region) WorkStorePeer(rs *regionStore) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
return r.getKvStorePeer(rs, rs.workTiKVIdx)
}

// FollowerStorePeer returns a follower store with follower peer.
func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
func (r *Region) FollowerStorePeer(rs *regionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
return r.getKvStorePeer(rs, rs.follower(followerStoreSeed, op))
}

// AnyStorePeer returns a leader or follower store with the associated peer.
func (r *Region) AnyStorePeer(rs *RegionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
func (r *Region) AnyStorePeer(rs *regionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
return r.getKvStorePeer(rs, rs.kvPeer(followerStoreSeed, op))
}

Expand Down Expand Up @@ -1785,14 +1785,14 @@ retry:
return
}

func (r *RegionStore) switchNextFlashPeer(rr *Region, currentPeerIdx AccessIndex) {
func (r *regionStore) switchNextFlashPeer(rr *Region, currentPeerIdx AccessIndex) {
nextIdx := (currentPeerIdx + 1) % AccessIndex(r.accessStoreNum(TiFlashOnly))
newRegionStore := r.clone()
newRegionStore.workTiFlashIdx = int32(nextIdx)
rr.compareAndSwapStore(r, newRegionStore)
}

func (r *RegionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex) {
func (r *regionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex) {
if r.workTiKVIdx != currentPeerIdx {
return
}
Expand All @@ -1805,7 +1805,7 @@ func (r *RegionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex)
// switchNextProxyStore switches the index of the peer that will forward requests to the leader to the next peer.
// If proxy is currently not used on this region, the value of `currentProxyIdx` should be -1, and a random peer will
// be select in this case.
func (r *RegionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessIndex, incEpochStoreIdx int) {
func (r *regionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessIndex, incEpochStoreIdx int) {
if r.proxyTiKVIdx != currentProxyIdx {
return
}
Expand Down Expand Up @@ -1837,7 +1837,7 @@ func (r *RegionStore) switchNextProxyStore(rr *Region, currentProxyIdx AccessInd
rr.compareAndSwapStore(r, newRegionStore)
}

func (r *RegionStore) setProxyStoreIdx(rr *Region, idx AccessIndex) {
func (r *regionStore) setProxyStoreIdx(rr *Region, idx AccessIndex) {
if r.proxyTiKVIdx == idx {
return
}
Expand All @@ -1851,7 +1851,7 @@ func (r *RegionStore) setProxyStoreIdx(rr *Region, idx AccessIndex) {
zap.Bool("success", success))
}

func (r *RegionStore) unsetProxyStoreIfNeeded(rr *Region) {
func (r *regionStore) unsetProxyStoreIfNeeded(rr *Region) {
r.setProxyStoreIdx(rr, -1)
}

Expand Down