diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 5a8171940b37..9e6053192dc8 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -83,6 +83,7 @@ go_library( "store_rebalancer.go", "store_remove_replica.go", "store_replica_btree.go", + "store_replicas_by_rangeid.go", "store_send.go", "store_snapshot.go", "store_split.go", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e0e34040518d..e92a81bb820f 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "testing" "time" - "unsafe" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/kv" @@ -515,7 +514,7 @@ func WriteRandomDataToRange( } func WatchForDisappearingReplicas(t testing.TB, store *Store) { - m := make(map[int64]struct{}) + m := make(map[roachpb.RangeID]struct{}) for { select { case <-store.Stopper().ShouldQuiesce(): @@ -523,13 +522,12 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - m[k] = struct{}{} - return true + store.mu.replicasByRangeID.Range(func(repl *Replica) { + m[repl.RangeID] = struct{}{} }) for k := range m { - if _, ok := store.mu.replicas.Load(k); !ok { + if _, ok := store.mu.replicasByRangeID.Load(k); !ok { t.Fatalf("r%d disappeared from Store.mu.replicas map", k) } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 7c80312f6930..4ef95d32a143 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,9 +364,8 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - rs.repls = append(rs.repls, (*Replica)(v)) - return true + rs.store.mu.replicasByRangeID.Range(func(repl *Replica) { + rs.repls = append(rs.repls, repl) }) if rs.ordered { @@ -586,7 +585,7 @@ type Store struct { syncutil.RWMutex // Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This // includes `uninitReplicas`. May be read without holding Store.mu. - replicas syncutil.IntMap + replicasByRangeID rangeIDReplicaMap // A btree key containing objects of type *Replica or *ReplicaPlaceholder. // Both types have an associated key range; the btree is keyed on their // start keys. @@ -2411,8 +2410,8 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { // GetReplicaIfExists returns the replica with the given RangeID or nil. func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value) + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { + return repl } return nil } @@ -2459,8 +2458,8 @@ func (s *Store) getOverlappingKeyRangeLocked( // RaftStatus returns the current raft status of the local replica of // the given range. func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value).RaftStatus() + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { + return repl.RaftStatus() } return nil } @@ -2590,9 +2589,8 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool { + s.mu.replicasByRangeID.Range(func(*Replica) { count++ - return true }) return count } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index a3ed8b7f2a81..0e0e92a281b0 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -13,7 +13,6 @@ package kvserver import ( "context" "time" - "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -80,8 +79,7 @@ func (s *Store) tryGetOrCreateReplica( creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { // The common case: look up an existing (initialized) replica. - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - repl := (*Replica)(value) + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { repl.raftMu.Lock() // not unlocked on success repl.mu.Lock() @@ -297,8 +295,8 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { // It's ok for the replica to exist in the replicas map as long as it is the // same replica object. This occurs during splits where the right-hand side // is added to the replicas map before it is initialized. - if existing, loaded := s.mu.replicas.LoadOrStore( - int64(repl.RangeID), unsafe.Pointer(repl)); loaded && (*Replica)(existing) != repl { + if existing, loaded := s.mu.replicasByRangeID.LoadOrStore( + repl.RangeID, repl); loaded && existing != repl { return errors.Errorf("%s: replica already exists", repl) } // Check whether the replica is unquiesced but not in the map. This @@ -314,7 +312,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { } // maybeMarkReplicaInitializedLocked should be called whenever a previously -// unintialized replica has become initialized so that the store can update its +// uninitialized replica has become initialized so that the store can update its // internal bookkeeping. It requires that Store.mu and Replica.raftMu // are locked. func (s *Store) maybeMarkReplicaInitializedLockedReplLocked( diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index c1c06cc8f491..69ee2ff96c88 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // forgiving. // // See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411. - if _, exists := s.mu.replicas.Load(int64(rangeID)); !exists { + if _, exists := s.mu.replicasByRangeID.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { s.replicaQueues.Delete(int64(rangeID)) @@ -500,12 +500,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID } func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { return } - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) @@ -524,14 +523,13 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { } func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) start := timeutil.Now() - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) exists, err := r.tick(ctx, livenessMap) if err != nil { @@ -560,8 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - r := (*Replica)(v) + s.mu.replicasByRangeID.Range(func(r *Replica) { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -570,7 +567,6 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { if quiescent && (lagging.MemberStale(l) || !laggingAccurate) { r.unquiesce() } - return true }) } @@ -730,13 +726,13 @@ func (s *Store) sendQueuedHeartbeatsToNode( if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { - if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID) + if repl, ok := s.mu.replicasByRangeID.Load(beat.RangeID); ok { + repl.addUnreachableRemoteReplica(beat.ToReplicaID) } } for _, resp := range resps { - if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID) + if repl, ok := s.mu.replicasByRangeID.Load(resp.RangeID); ok { + repl.addUnreachableRemoteReplica(resp.ToReplicaID) } } return 0 diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index d919f29abf67..8f47c805c840 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -238,11 +238,10 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( defer s.mu.Unlock() // Sanity check, could be removed. - value, stillExists := s.mu.replicas.Load(int64(rep.RangeID)) + existing, stillExists := s.mu.replicasByRangeID.Load(rep.RangeID) if !stillExists { log.Fatalf(ctx, "uninitialized replica was removed in the meantime") } - existing := (*Replica)(value) if existing == rep { log.Infof(ctx, "removing uninitialized replica %v", rep) } else { @@ -264,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) s.replicaQueues.Delete(int64(rangeID)) - s.mu.replicas.Delete(int64(rangeID)) + s.mu.replicasByRangeID.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) } diff --git a/pkg/kv/kvserver/store_replicas_by_rangeid.go b/pkg/kv/kvserver/store_replicas_by_rangeid.go new file mode 100644 index 000000000000..fe8c943181e3 --- /dev/null +++ b/pkg/kv/kvserver/store_replicas_by_rangeid.go @@ -0,0 +1,51 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +type rangeIDReplicaMap syncutil.IntMap + +// Load loads the Replica for the RangeID. If not found, returns +// (nil, false), otherwise the Replica and true. +func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { + val, ok := (*syncutil.IntMap)(m).Load(int64(rangeID)) + return (*Replica)(val), ok +} + +// LoadOrStore loads the replica and returns it (and `true`). If it does not +// exist, atomically inserts the provided Replica and returns it along with +// `false`. +func (m *rangeIDReplicaMap) LoadOrStore( + rangeID roachpb.RangeID, repl *Replica, +) (_ *Replica, loaded bool) { + val, loaded := (*syncutil.IntMap)(m).LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) + return (*Replica)(val), loaded +} + +// Delete drops the Replica if it existed in the map. +func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) { + (*syncutil.IntMap)(m).Delete(int64(rangeID)) +} + +// Range invokes the provided function with each Replica in the map. +func (m *rangeIDReplicaMap) Range(f func(*Replica)) { + v := func(k int64, v unsafe.Pointer) bool { + f((*Replica)(v)) + return true // wantMore + } + (*syncutil.IntMap)(m).Range(v) +} diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index d5bf8264c207..e34d412e7dde 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -450,13 +450,10 @@ func (s *Store) canAcceptSnapshotLocked( desc := *snapHeader.State.Desc // First, check for an existing Replica. - v, ok := s.mu.replicas.Load( - int64(desc.RangeID), - ) + existingRepl, ok := s.mu.replicasByRangeID.Load(desc.RangeID) if !ok { return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present") } - existingRepl := (*Replica)(v) // The raftMu is held which allows us to use the existing replica as a // placeholder when we decide that the snapshot can be applied. As long // as the caller releases the raftMu only after feeding the snapshot