From d9087aff9ba866616a26f506f07ad11bf32d27e0 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 4 Nov 2021 15:03:08 +0100 Subject: [PATCH 1/3] kvserver: use wrapper type for Store.mu.replicas This simplifies lots of callers and it will also make it easier to work on #72374, where this map will start containing more than one type as value. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/helpers_test.go | 9 ++- pkg/kv/kvserver/store.go | 20 +++---- pkg/kv/kvserver/store_create_replica.go | 8 +-- pkg/kv/kvserver/store_raft.go | 21 +++---- pkg/kv/kvserver/store_remove_replica.go | 5 +- pkg/kv/kvserver/store_replicas_by_rangeid.go | 63 ++++++++++++++++++++ pkg/kv/kvserver/store_snapshot.go | 5 +- 8 files changed, 93 insertions(+), 39 deletions(-) create mode 100644 pkg/kv/kvserver/store_replicas_by_rangeid.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 5a8171940b37..9e6053192dc8 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -83,6 +83,7 @@ go_library( "store_rebalancer.go", "store_remove_replica.go", "store_replica_btree.go", + "store_replicas_by_rangeid.go", "store_send.go", "store_snapshot.go", "store_split.go", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e0e34040518d..70fefa4ea431 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "testing" "time" - "unsafe" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/kv" @@ -515,7 +514,7 @@ func WriteRandomDataToRange( } func WatchForDisappearingReplicas(t testing.TB, store *Store) { - m := make(map[int64]struct{}) + m := make(map[roachpb.RangeID]struct{}) for { select { case <-store.Stopper().ShouldQuiesce(): @@ -523,9 +522,9 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - m[k] = struct{}{} - return true + _ = store.mu.replicas.Range(func(repl *Replica) error { + m[repl.RangeID] = struct{}{} + return nil }) for k := range m { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 7c80312f6930..dbe78e872c58 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,9 +364,9 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - rs.repls = append(rs.repls, (*Replica)(v)) - return true + _ = rs.store.mu.replicas.Range(func(repl *Replica) error { + rs.repls = append(rs.repls, repl) + return nil }) if rs.ordered { @@ -586,7 +586,7 @@ type Store struct { syncutil.RWMutex // Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This // includes `uninitReplicas`. May be read without holding Store.mu. - replicas syncutil.IntMap + replicas rangeIDReplicaMap // A btree key containing objects of type *Replica or *ReplicaPlaceholder. // Both types have an associated key range; the btree is keyed on their // start keys. @@ -2411,8 +2411,8 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { // GetReplicaIfExists returns the replica with the given RangeID or nil. func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value) + if repl, ok := s.mu.replicas.Load(rangeID); ok { + return repl } return nil } @@ -2459,8 +2459,8 @@ func (s *Store) getOverlappingKeyRangeLocked( // RaftStatus returns the current raft status of the local replica of // the given range. func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value).RaftStatus() + if repl, ok := s.mu.replicas.Load(rangeID); ok { + return repl.RaftStatus() } return nil } @@ -2590,9 +2590,9 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool { + _ = s.mu.replicas.Range(func(*Replica) error { count++ - return true + return nil }) return count } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index a3ed8b7f2a81..910453ba6c0a 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -13,7 +13,6 @@ package kvserver import ( "context" "time" - "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -80,8 +79,7 @@ func (s *Store) tryGetOrCreateReplica( creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { // The common case: look up an existing (initialized) replica. - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - repl := (*Replica)(value) + if repl, ok := s.mu.replicas.Load(rangeID); ok { repl.raftMu.Lock() // not unlocked on success repl.mu.Lock() @@ -298,7 +296,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { // same replica object. This occurs during splits where the right-hand side // is added to the replicas map before it is initialized. if existing, loaded := s.mu.replicas.LoadOrStore( - int64(repl.RangeID), unsafe.Pointer(repl)); loaded && (*Replica)(existing) != repl { + repl.RangeID, repl); loaded && existing != repl { return errors.Errorf("%s: replica already exists", repl) } // Check whether the replica is unquiesced but not in the map. This @@ -314,7 +312,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { } // maybeMarkReplicaInitializedLocked should be called whenever a previously -// unintialized replica has become initialized so that the store can update its +// uninitialized replica has become initialized so that the store can update its // internal bookkeeping. It requires that Store.mu and Replica.raftMu // are locked. func (s *Store) maybeMarkReplicaInitializedLockedReplLocked( diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index c1c06cc8f491..cfe741f2df50 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // forgiving. // // See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411. - if _, exists := s.mu.replicas.Load(int64(rangeID)); !exists { + if _, exists := s.mu.replicas.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { s.replicaQueues.Delete(int64(rangeID)) @@ -500,12 +500,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID } func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicas.Load(rangeID) if !ok { return } - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) @@ -524,14 +523,13 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { } func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicas.Load(rangeID) if !ok { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) start := timeutil.Now() - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) exists, err := r.tick(ctx, livenessMap) if err != nil { @@ -560,8 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - r := (*Replica)(v) + _ = s.mu.replicas.Range(func(r *Replica) error { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -570,7 +567,7 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { if quiescent && (lagging.MemberStale(l) || !laggingAccurate) { r.unquiesce() } - return true + return nil }) } @@ -730,13 +727,13 @@ func (s *Store) sendQueuedHeartbeatsToNode( if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { - if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID) + if repl, ok := s.mu.replicas.Load(beat.RangeID); ok { + repl.addUnreachableRemoteReplica(beat.ToReplicaID) } } for _, resp := range resps { - if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID) + if repl, ok := s.mu.replicas.Load(resp.RangeID); ok { + repl.addUnreachableRemoteReplica(resp.ToReplicaID) } } return 0 diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index d919f29abf67..cb8cd493d2f5 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -238,11 +238,10 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( defer s.mu.Unlock() // Sanity check, could be removed. - value, stillExists := s.mu.replicas.Load(int64(rep.RangeID)) + existing, stillExists := s.mu.replicas.Load(rep.RangeID) if !stillExists { log.Fatalf(ctx, "uninitialized replica was removed in the meantime") } - existing := (*Replica)(value) if existing == rep { log.Infof(ctx, "removing uninitialized replica %v", rep) } else { @@ -264,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) s.replicaQueues.Delete(int64(rangeID)) - s.mu.replicas.Delete(int64(rangeID)) + s.mu.replicas.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) } diff --git a/pkg/kv/kvserver/store_replicas_by_rangeid.go b/pkg/kv/kvserver/store_replicas_by_rangeid.go new file mode 100644 index 000000000000..c98bd471e5ac --- /dev/null +++ b/pkg/kv/kvserver/store_replicas_by_rangeid.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +type rangeIDReplicaMap struct { + m syncutil.IntMap +} + +// Load loads the Replica for the RangeID. If not found, returns +// (nil, false), otherwise the Replica and true. +func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { + val, ok := m.m.Load(int64(rangeID)) + return (*Replica)(val), ok +} + +// LoadOrStore loads the replica and returns it (and `true`). If it does not +// exist, atomically inserts the provided Replica and returns it along with +// `false`. +func (m *rangeIDReplicaMap) LoadOrStore( + rangeID roachpb.RangeID, repl *Replica, +) (_ *Replica, loaded bool) { + val, loaded := m.m.LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) + return (*Replica)(val), loaded +} + +// Delete drops the Replica if it existed in the map. +func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) { + m.m.Delete(int64(rangeID)) +} + +// Range invokes the provided function with each Replica in the map. +// Iteration stops on any error. `iterutil.StopIteration()` can be +// returned from the closure to stop iteration without an error +// resulting from Range(). +func (m *rangeIDReplicaMap) Range(f func(*Replica) error) error { + var err error + v := func(k int64, v unsafe.Pointer) (wantMore bool) { + err = f((*Replica)(v)) + return err == nil + } + m.m.Range(v) + if errors.Is(err, iterutil.StopIteration()) { + return nil + } + return nil +} diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index d5bf8264c207..7ba412895112 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -450,13 +450,10 @@ func (s *Store) canAcceptSnapshotLocked( desc := *snapHeader.State.Desc // First, check for an existing Replica. - v, ok := s.mu.replicas.Load( - int64(desc.RangeID), - ) + existingRepl, ok := s.mu.replicas.Load(desc.RangeID) if !ok { return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present") } - existingRepl := (*Replica)(v) // The raftMu is held which allows us to use the existing replica as a // placeholder when we decide that the snapshot can be applied. As long // as the caller releases the raftMu only after feeding the snapshot From be9bf0037fe23cf7e81a2d2f13401bdaa770e343 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 4 Nov 2021 15:07:48 +0100 Subject: [PATCH 2/3] kvserver: rename replicas to replicasByRangeID Release note: None --- pkg/kv/kvserver/helpers_test.go | 4 ++-- pkg/kv/kvserver/store.go | 10 +++++----- pkg/kv/kvserver/store_create_replica.go | 4 ++-- pkg/kv/kvserver/store_raft.go | 12 ++++++------ pkg/kv/kvserver/store_remove_replica.go | 4 ++-- pkg/kv/kvserver/store_snapshot.go | 2 +- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 70fefa4ea431..ef127376f7c0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -522,13 +522,13 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - _ = store.mu.replicas.Range(func(repl *Replica) error { + _ = store.mu.replicasByRangeID.Range(func(repl *Replica) error { m[repl.RangeID] = struct{}{} return nil }) for k := range m { - if _, ok := store.mu.replicas.Load(k); !ok { + if _, ok := store.mu.replicasByRangeID.Load(k); !ok { t.Fatalf("r%d disappeared from Store.mu.replicas map", k) } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index dbe78e872c58..ee4f817688b3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,7 +364,7 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - _ = rs.store.mu.replicas.Range(func(repl *Replica) error { + _ = rs.store.mu.replicasByRangeID.Range(func(repl *Replica) error { rs.repls = append(rs.repls, repl) return nil }) @@ -586,7 +586,7 @@ type Store struct { syncutil.RWMutex // Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This // includes `uninitReplicas`. May be read without holding Store.mu. - replicas rangeIDReplicaMap + replicasByRangeID rangeIDReplicaMap // A btree key containing objects of type *Replica or *ReplicaPlaceholder. // Both types have an associated key range; the btree is keyed on their // start keys. @@ -2411,7 +2411,7 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { // GetReplicaIfExists returns the replica with the given RangeID or nil. func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica { - if repl, ok := s.mu.replicas.Load(rangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { return repl } return nil @@ -2459,7 +2459,7 @@ func (s *Store) getOverlappingKeyRangeLocked( // RaftStatus returns the current raft status of the local replica of // the given range. func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { - if repl, ok := s.mu.replicas.Load(rangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { return repl.RaftStatus() } return nil @@ -2590,7 +2590,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - _ = s.mu.replicas.Range(func(*Replica) error { + _ = s.mu.replicasByRangeID.Range(func(*Replica) error { count++ return nil }) diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 910453ba6c0a..0e0e92a281b0 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -79,7 +79,7 @@ func (s *Store) tryGetOrCreateReplica( creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { // The common case: look up an existing (initialized) replica. - if repl, ok := s.mu.replicas.Load(rangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok { repl.raftMu.Lock() // not unlocked on success repl.mu.Lock() @@ -295,7 +295,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { // It's ok for the replica to exist in the replicas map as long as it is the // same replica object. This occurs during splits where the right-hand side // is added to the replicas map before it is initialized. - if existing, loaded := s.mu.replicas.LoadOrStore( + if existing, loaded := s.mu.replicasByRangeID.LoadOrStore( repl.RangeID, repl); loaded && existing != repl { return errors.Errorf("%s: replica already exists", repl) } diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index cfe741f2df50..f6c91f6544fe 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // forgiving. // // See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411. - if _, exists := s.mu.replicas.Load(rangeID); !exists { + if _, exists := s.mu.replicasByRangeID.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { s.replicaQueues.Delete(int64(rangeID)) @@ -500,7 +500,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID } func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { - r, ok := s.mu.replicas.Load(rangeID) + r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { return } @@ -523,7 +523,7 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { } func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { - r, ok := s.mu.replicas.Load(rangeID) + r, ok := s.mu.replicasByRangeID.Load(rangeID) if !ok { return false } @@ -558,7 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - _ = s.mu.replicas.Range(func(r *Replica) error { + _ = s.mu.replicasByRangeID.Range(func(r *Replica) error { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -727,12 +727,12 @@ func (s *Store) sendQueuedHeartbeatsToNode( if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { - if repl, ok := s.mu.replicas.Load(beat.RangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(beat.RangeID); ok { repl.addUnreachableRemoteReplica(beat.ToReplicaID) } } for _, resp := range resps { - if repl, ok := s.mu.replicas.Load(resp.RangeID); ok { + if repl, ok := s.mu.replicasByRangeID.Load(resp.RangeID); ok { repl.addUnreachableRemoteReplica(resp.ToReplicaID) } } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index cb8cd493d2f5..8f47c805c840 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -238,7 +238,7 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( defer s.mu.Unlock() // Sanity check, could be removed. - existing, stillExists := s.mu.replicas.Load(rep.RangeID) + existing, stillExists := s.mu.replicasByRangeID.Load(rep.RangeID) if !stillExists { log.Fatalf(ctx, "uninitialized replica was removed in the meantime") } @@ -263,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) s.replicaQueues.Delete(int64(rangeID)) - s.mu.replicas.Delete(rangeID) + s.mu.replicasByRangeID.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 7ba412895112..e34d412e7dde 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -450,7 +450,7 @@ func (s *Store) canAcceptSnapshotLocked( desc := *snapHeader.State.Desc // First, check for an existing Replica. - existingRepl, ok := s.mu.replicas.Load(desc.RangeID) + existingRepl, ok := s.mu.replicasByRangeID.Load(desc.RangeID) if !ok { return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present") } From 2113763e676479963445fcf437c15fc0ba64dab8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 4 Nov 2021 22:59:21 +0100 Subject: [PATCH 3/3] kvserver: touch up rangeIDReplicaMap - avoid the struct wrapper (and thus the need to name the inner map) - don't return an error in Range() Release note: None --- pkg/kv/kvserver/helpers_test.go | 3 +- pkg/kv/kvserver/store.go | 6 ++-- pkg/kv/kvserver/store_raft.go | 3 +- pkg/kv/kvserver/store_replicas_by_rangeid.go | 30 ++++++-------------- 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index ef127376f7c0..e92a81bb820f 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -522,9 +522,8 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - _ = store.mu.replicasByRangeID.Range(func(repl *Replica) error { + store.mu.replicasByRangeID.Range(func(repl *Replica) { m[repl.RangeID] = struct{}{} - return nil }) for k := range m { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index ee4f817688b3..4ef95d32a143 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,9 +364,8 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - _ = rs.store.mu.replicasByRangeID.Range(func(repl *Replica) error { + rs.store.mu.replicasByRangeID.Range(func(repl *Replica) { rs.repls = append(rs.repls, repl) - return nil }) if rs.ordered { @@ -2590,9 +2589,8 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - _ = s.mu.replicasByRangeID.Range(func(*Replica) error { + s.mu.replicasByRangeID.Range(func(*Replica) { count++ - return nil }) return count } diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index f6c91f6544fe..69ee2ff96c88 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -558,7 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - _ = s.mu.replicasByRangeID.Range(func(r *Replica) error { + s.mu.replicasByRangeID.Range(func(r *Replica) { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -567,7 +567,6 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { if quiescent && (lagging.MemberStale(l) || !laggingAccurate) { r.unquiesce() } - return nil }) } diff --git a/pkg/kv/kvserver/store_replicas_by_rangeid.go b/pkg/kv/kvserver/store_replicas_by_rangeid.go index c98bd471e5ac..fe8c943181e3 100644 --- a/pkg/kv/kvserver/store_replicas_by_rangeid.go +++ b/pkg/kv/kvserver/store_replicas_by_rangeid.go @@ -14,19 +14,15 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" ) -type rangeIDReplicaMap struct { - m syncutil.IntMap -} +type rangeIDReplicaMap syncutil.IntMap // Load loads the Replica for the RangeID. If not found, returns // (nil, false), otherwise the Replica and true. func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { - val, ok := m.m.Load(int64(rangeID)) + val, ok := (*syncutil.IntMap)(m).Load(int64(rangeID)) return (*Replica)(val), ok } @@ -36,28 +32,20 @@ func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { func (m *rangeIDReplicaMap) LoadOrStore( rangeID roachpb.RangeID, repl *Replica, ) (_ *Replica, loaded bool) { - val, loaded := m.m.LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) + val, loaded := (*syncutil.IntMap)(m).LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) return (*Replica)(val), loaded } // Delete drops the Replica if it existed in the map. func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) { - m.m.Delete(int64(rangeID)) + (*syncutil.IntMap)(m).Delete(int64(rangeID)) } // Range invokes the provided function with each Replica in the map. -// Iteration stops on any error. `iterutil.StopIteration()` can be -// returned from the closure to stop iteration without an error -// resulting from Range(). -func (m *rangeIDReplicaMap) Range(f func(*Replica) error) error { - var err error - v := func(k int64, v unsafe.Pointer) (wantMore bool) { - err = f((*Replica)(v)) - return err == nil - } - m.m.Range(v) - if errors.Is(err, iterutil.StopIteration()) { - return nil +func (m *rangeIDReplicaMap) Range(f func(*Replica)) { + v := func(k int64, v unsafe.Pointer) bool { + f((*Replica)(v)) + return true // wantMore } - return nil + (*syncutil.IntMap)(m).Range(v) }