Skip to content

Commit

Permalink
store/tikv: remove Peer (#1187)
Browse files Browse the repository at this point in the history
* store/tikv: remove Peer
*: update deps
  • Loading branch information
disksing committed May 3, 2016
1 parent 74742ce commit f673bc3
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 443 deletions.
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,17 @@ func (it *copIterator) Next() (io.ReadCloser, error) {
}
resp, err := client.SendCopReq(req)
if err != nil {
it.store.regionCache.NextPeer(task.region.GetID())
it.store.regionCache.NextStore(task.region.GetID())
err = it.rebuildCurrentTask()
if err != nil {
return nil, errors.Trace(err)
}
log.Warnf("send coprocessor request error: %v, try next peer later", err)
log.Warnf("send coprocessor request error: %v, try next store later", err)
continue
}
if e := resp.GetRegionError(); e != nil {
if notLeader := e.GetNotLeader(); notLeader != nil {
it.store.regionCache.UpdateLeader(notLeader.GetRegionId(), notLeader.GetLeader().GetId())
it.store.regionCache.UpdateLeader(notLeader.GetRegionId(), notLeader.GetLeaderStoreId())
} else {
it.store.regionCache.DropRegion(task.region.GetID())
}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ func (s *tikvStore) SendKVReq(req *pb.Request, region *requestRegion) (*pb.Respo
req.Context = region.GetContext()
resp, err := client.SendKVReq(req)
if err != nil {
log.Warnf("send tikv request error: %v, try next peer later", err)
s.regionCache.NextPeer(region.GetID())
log.Warnf("send tikv request error: %v, try next store later", err)
s.regionCache.NextStore(region.GetID())
continue
}
if regionErr := resp.GetRegionError(); regionErr != nil {
// Retry if error is `NotLeader`.
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
log.Warnf("tikv reports `NotLeader`: %s, retry later", notLeader.String())
s.regionCache.UpdateLeader(notLeader.GetRegionId(), notLeader.GetLeader().GetId())
s.regionCache.UpdateLeader(notLeader.GetRegionId(), notLeader.GetLeaderStoreId())
continue
}
// For other errors, we only drop cache here.
Expand Down
82 changes: 38 additions & 44 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,72 +57,69 @@ func (c *RegionCache) DropRegion(regionID uint64) {
delete(c.regions, regionID)
}

// NextPeer picks next peer as new leader, if out of range of peers delete region.
func (c *RegionCache) NextPeer(regionID uint64) {
// NextStore picks next store as new leader, if out of range of stores delete region.
func (c *RegionCache) NextStore(regionID uint64) {
// A and B get the same region and current leader is 1, they both will pick
// peer 2 as leader.
// store 2 as leader.
c.mu.RLock()
region, ok := c.regions[regionID]
c.mu.RUnlock()
if !ok {
return
}
if leader, err := region.NextPeer(); err != nil {
if leader, err := region.NextStore(); err != nil {
c.mu.Lock()
delete(c.regions, regionID)
c.mu.Unlock()
} else {
c.UpdateLeader(regionID, leader.GetId())
c.UpdateLeader(regionID, leader)
}
}

// UpdateLeader update some region cache with newer leader info.
func (c *RegionCache) UpdateLeader(regionID, leaderID uint64) {
// UpdateLeader update some region cache with newer leader store ID.
func (c *RegionCache) UpdateLeader(regionID, leaderStoreID uint64) {
c.mu.RLock()
old, ok := c.regions[regionID]
c.mu.RUnlock()
if !ok {
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderID)
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID)
return
}
var (
peer *metapb.Peer
store *metapb.Store
err error
)

curPeerIdx := -1
for idx, p := range old.meta.Peers {
if p.GetId() == leaderID {
peer = p
curStoreIdx := -1
for idx, storeID := range old.meta.StoreIds {
if storeID == leaderStoreID {
// No need update leader.
if idx == old.curPeerIdx {
if idx == old.curStoreIdx {
return
}
curPeerIdx = idx
curStoreIdx = idx
break
}
}
if peer != nil {
store, err = c.pdClient.GetStore(peer.GetStoreId())
if curStoreIdx != -1 {
store, err = c.pdClient.GetStore(leaderStoreID)
}

c.mu.Lock()
defer c.mu.Unlock()
delete(c.regions, regionID)

if peer == nil || err != nil {
// Can't find the peer in cache, or error occurs when loading
if curStoreIdx == -1 || err != nil {
// Can't find the store in cache, or error occurs when loading
// store from PD.
// Leave the region deleted, it will be filled later.
return
}

c.regions[regionID] = &Region{
meta: old.meta,
peer: peer,
addr: store.GetAddress(),
curPeerIdx: curPeerIdx,
meta: old.meta,
addr: store.GetAddress(),
curStoreIdx: curStoreIdx,
}
}

Expand All @@ -139,28 +136,27 @@ func (c *RegionCache) getRegionFromCache(key []byte) *Region {
return nil
}

// loadRegion get region from pd client, and pick the random peer as leader.
// loadRegion get region from pd client, and pick the random store as leader.
func (c *RegionCache) loadRegion(key []byte) (*Region, error) {
meta, err := c.pdClient.GetRegion(key)
if err != nil {
// We assume PD will recover soon.
return nil, errors.Annotate(err, txnRetryableMark)
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
if len(meta.StoreIds) == 0 {
return nil, errors.New("receive Region with no store")
}
curPeerIdx := 0
peer := meta.Peers[curPeerIdx]
store, err := c.pdClient.GetStore(peer.GetStoreId())
curStoreIdx := 0
storeID := meta.StoreIds[curStoreIdx]
store, err := c.pdClient.GetStore(storeID)
if err != nil {
// We assume PD will recover soon.
return nil, errors.Annotate(err, txnRetryableMark)
}
region := &Region{
meta: meta,
peer: peer,
addr: store.GetAddress(),
curPeerIdx: curPeerIdx,
meta: meta,
addr: store.GetAddress(),
curStoreIdx: curStoreIdx,
}

c.mu.Lock()
Expand All @@ -175,10 +171,9 @@ func (c *RegionCache) loadRegion(key []byte) (*Region, error) {

// Region store region info. Region is a readonly class.
type Region struct {
meta *metapb.Region
peer *metapb.Peer
addr string
curPeerIdx int
meta *metapb.Region
addr string
curStoreIdx int
}

// GetID return id.
Expand Down Expand Up @@ -206,7 +201,6 @@ func (r *Region) GetContext() *kvrpcpb.Context {
return &kvrpcpb.Context{
RegionId: r.meta.Id,
RegionEpoch: r.meta.RegionEpoch,
Peer: r.peer,
}
}

Expand All @@ -217,13 +211,13 @@ func (r *Region) Contains(key []byte) bool {
(bytes.Compare(key, r.meta.GetEndKey()) < 0 || len(r.meta.GetEndKey()) == 0)
}

// NextPeer picks next peer as leader, if out of range return error.
func (r *Region) NextPeer() (*metapb.Peer, error) {
nextPeerIdx := r.curPeerIdx + 1
if nextPeerIdx >= len(r.meta.Peers) {
return nil, errors.New("out of range of peer")
// NextStore picks next store as leader, if out of range return error.
func (r *Region) NextStore() (uint64, error) {
nextStoreIdx := r.curStoreIdx + 1
if nextStoreIdx >= len(r.meta.StoreIds) {
return 0, errors.New("out of range of store")
}
return r.meta.Peers[nextPeerIdx], nil
return r.meta.StoreIds[nextStoreIdx], nil
}

// regionMissBackoff is for region cache miss retry.
Expand Down
37 changes: 15 additions & 22 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.curPeerIdx, Equals, 1)
c.Assert(r.curStoreIdx, Equals, 1)
c.Assert(r.GetAddress(), Equals, "addr2")
}

Expand All @@ -106,7 +106,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.curPeerIdx, Equals, 0)
c.Assert(r.curStoreIdx, Equals, 0)
c.Assert(r.GetAddress(), Equals, "addr4")
}

Expand Down Expand Up @@ -188,31 +188,31 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) {
c.Assert(s.cache.regions, HasLen, 1)
}

func (s *testRegionCacheSuite) TestNextPeer(c *C) {
func (s *testRegionCacheSuite) TestNextStore(c *C) {
region, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(region.curPeerIdx, Equals, 0)
c.Assert(region.curStoreIdx, Equals, 0)

s.cache.NextPeer(3)
s.cache.NextStore(3)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(region.curPeerIdx, Equals, 1)
c.Assert(region.curStoreIdx, Equals, 1)

s.cache.NextPeer(3)
s.cache.NextStore(3)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
// Out of range of Peers, so get Region again and pick Peers[0] as leader.
c.Assert(region.curPeerIdx, Equals, 0)
// Out of range of Stores, so get Region again and pick Stores[0] as leader.
c.Assert(region.curStoreIdx, Equals, 0)

s.pd.removeRegion(3)
// regionCache still has more Peers, so pick next peer.
s.cache.NextPeer(3)
// regionCache still has more Stores, so pick next store.
s.cache.NextStore(3)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(region.curPeerIdx, Equals, 1)
c.Assert(region.curStoreIdx, Equals, 1)

// region 3 is removed so can't get Region from pd.
s.cache.NextPeer(3)
s.cache.NextStore(3)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, NotNil)
c.Assert(region, IsNil)
Expand Down Expand Up @@ -243,19 +243,12 @@ func (c *mockPDClient) removeStore(id uint64) {
delete(c.stores, id)
}

func (c *mockPDClient) setRegion(id uint64, startKey, endKey []byte, peers []uint64) {
var metaPeers []*metapb.Peer
for _, id := range peers {
metaPeers = append(metaPeers, &metapb.Peer{
Id: proto.Uint64(id),
StoreId: proto.Uint64(id),
})
}
func (c *mockPDClient) setRegion(id uint64, startKey, endKey []byte, stores []uint64) {
c.regions[id] = &metapb.Region{
Id: proto.Uint64(id),
StartKey: startKey,
EndKey: endKey,
Peers: metaPeers,
StoreIds: stores,
}
}

Expand Down
20 changes: 10 additions & 10 deletions tidb-server/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f673bc3

Please sign in to comment.