Skip to content

Commit

Permalink
server/core: check region term before updating cache (#2667)
Browse files Browse the repository at this point in the history
Signed-off-by: howardlau1999 <howardlau1999@hotmail.com>
  • Loading branch information
howardlau1999 authored Jul 22, 2020
1 parent 42cce7a commit a9cdb2d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 1 deletion.
7 changes: 6 additions & 1 deletion server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,15 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()

// TiKV reports term after v3.0
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()

// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() || isTermBehind {
return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}

return origin, nil
}

Expand Down
8 changes: 8 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
// RegionInfo records detail region info.
// Read-Only once created.
type RegionInfo struct {
term uint64
meta *metapb.Region
learners []*metapb.Peer
voters []*metapb.Peer
Expand Down Expand Up @@ -90,6 +91,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
}

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
Expand Down Expand Up @@ -120,6 +122,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
}

region := &RegionInfo{
term: r.term,
meta: proto.Clone(r.meta).(*metapb.Region),
leader: proto.Clone(r.leader).(*metapb.Peer),
downPeers: downPeers,
Expand All @@ -141,6 +144,11 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
return region
}

// GetTerm returns the current term of the region
func (r *RegionInfo) GetTerm() uint64 {
return r.term
}

// GetLearners returns the learners.
func (r *RegionInfo) GetLearners() []*metapb.Peer {
return r.learners
Expand Down
69 changes: 69 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,3 +1022,72 @@ func (s *clusterTestSuite) TestUpgradeStoreLimit(c *C) {
c.Assert(oc.AddOperator(op), IsFalse)
c.Assert(oc.RemoveOperator(op), IsFalse)
}

func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
c.Assert(err, IsNil)

err = tc.RunInitialServers()
c.Assert(err, IsNil)

tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(c, clusterID, grpcPDClient, "127.0.0.1:0")
storeAddrs := []string{"127.0.1.1:0", "127.0.1.1:1", "127.0.1.1:2"}
rc := leaderServer.GetRaftCluster()
c.Assert(rc, NotNil)
rc.SetStorage(core.NewStorage(kv.NewMemoryKV()))
var peers []*metapb.Peer
id := leaderServer.GetAllocator()
for _, addr := range storeAddrs {
storeID, err := id.Alloc()
c.Assert(err, IsNil)
peerID, err := id.Alloc()
c.Assert(err, IsNil)
store := newMetaStore(storeID, addr, "3.0.0", metapb.StoreState_Up, fmt.Sprintf("test/store%d", storeID))
_, err = putStore(c, grpcPDClient, clusterID, store)
c.Assert(err, IsNil)
peers = append(peers, &metapb.Peer{
Id: peerID,
StoreId: storeID,
})
}

regionReq := &pdpb.RegionHeartbeatRequest{
Header: testutil.NewRequestHeader(clusterID),
Region: &metapb.Region{
Id: 1,
Peers: peers,
StartKey: []byte{byte(2)},
EndKey: []byte{byte(3)},
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
},
Leader: peers[0],
Term: 5,
ApproximateSize: 10,
}

region := core.RegionFromHeartbeat(regionReq)
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

// Transfer leader
regionReq.Term = 6
regionReq.Leader = peers[1]
region = core.RegionFromHeartbeat(regionReq)
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

// Stale heartbeat, update check should fail
regionReq.Term = 5
regionReq.Leader = peers[0]
region = core.RegionFromHeartbeat(regionReq)
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, NotNil)
}

0 comments on commit a9cdb2d

Please sign in to comment.