Skip to content

Commit

Permalink
fix data race (#736)
Browse files Browse the repository at this point in the history
Signed-off-by: Smityz <smityz@qq.com>
Co-authored-by: disksing <i@disksing.com>
  • Loading branch information
Smityz and disksing authored Mar 27, 2023
1 parent f7e35b2 commit bb350d6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
12 changes: 9 additions & 3 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ func (r *Region) isValid() bool {
return r != nil && !r.checkNeedReload() && r.checkRegionCacheTTL(time.Now().Unix())
}

type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState

// RegionCache caches Regions loaded from PD.
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
// purposes only.
Expand Down Expand Up @@ -430,7 +432,7 @@ type RegionCache struct {
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness func(s *Store, bo *retry.Backoffer) livenessState
mockRequestLiveness atomic.Pointer[livenessFunc]
}
}

Expand Down Expand Up @@ -2615,8 +2617,12 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness
return unknown
}
}
if c != nil && c.testingKnobs.mockRequestLiveness != nil {
return c.testingKnobs.mockRequestLiveness(s, bo)

if c != nil {
livenessFunc := c.testingKnobs.mockRequestLiveness.Load()
if livenessFunc != nil {
return (*livenessFunc)(s, bo)
}
}

if storeLivenessTimeout == 0 {
Expand Down
20 changes: 14 additions & 6 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
return innerClient.SendRequest(ctx, addr, req, timeout)
}}
var storeState = uint32(unreachable)
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf := func(s *Store, bo *retry.Backoffer) livenessState {
if s.addr == leaderAddr {
return livenessState(atomic.LoadUint32(&storeState))
}
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))

loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k"))
s.Nil(err)
Expand Down Expand Up @@ -433,9 +434,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
s.IsType(&accessKnownLeader{}, replicaSelector.state)
_, err = replicaSelector.next(s.bo)
s.Nil(err)
Expand Down Expand Up @@ -471,9 +473,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Do not try to use proxy if livenessState is unknown instead of unreachable.
refreshEpochs(regionStore)
cache.enableForwarding = true
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unknown
}
cache.testingKnobs.mockRequestLiveness.Store(
(*livenessFunc)(&tf))
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
Expand All @@ -495,9 +499,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
s.Eventually(func() bool {
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable
}, 3*time.Second, 200*time.Millisecond)
Expand Down Expand Up @@ -750,9 +755,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])

// The leader store is alive but can't provide service.
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {

tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
Expand Down Expand Up @@ -878,9 +885,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}

// Runs out of all replicas and then returns a send error.
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
reloadRegion()
for _, store := range s.storeIDs {
s.cluster.StopStore(store)
Expand Down

0 comments on commit bb350d6

Please sign in to comment.