Skip to content

Commit

Permalink
kvserver: use wrapper type for Store.mu.replicas
Browse files Browse the repository at this point in the history
This simplifies lots of callers and it will also make it easier to work
on cockroachdb#72374, where this map will start containing more than one type as
value.

Release note: None
  • Loading branch information
tbg committed Nov 4, 2021
1 parent 4493953 commit 6256dd8
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 39 deletions.
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"math/rand"
"testing"
"time"
"unsafe"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -515,17 +514,17 @@ func WriteRandomDataToRange(
}

func WatchForDisappearingReplicas(t testing.TB, store *Store) {
m := make(map[int64]struct{})
m := make(map[roachpb.RangeID]struct{})
for {
select {
case <-store.Stopper().ShouldQuiesce():
return
default:
}

store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
m[k] = struct{}{}
return true
_ = store.mu.replicas.Range(func(repl *Replica) error {
m[repl.RangeID] = struct{}{}
return nil
})

for k := range m {
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) {
// stale) view of all Replicas without holding the Store lock. In particular,
// no locks are acquired during the copy process.
rs.repls = nil
rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
rs.repls = append(rs.repls, (*Replica)(v))
return true
_ = rs.store.mu.replicas.Range(func(repl *Replica) error {
rs.repls = append(rs.repls, repl)
return nil
})

if rs.ordered {
Expand Down Expand Up @@ -586,7 +586,7 @@ type Store struct {
syncutil.RWMutex
// Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This
// includes `uninitReplicas`. May be read without holding Store.mu.
replicas syncutil.IntMap
replicas rangeIDReplicaMap
// A btree key containing objects of type *Replica or *ReplicaPlaceholder.
// Both types have an associated key range; the btree is keyed on their
// start keys.
Expand Down Expand Up @@ -2411,8 +2411,8 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) {

// GetReplicaIfExists returns the replica with the given RangeID or nil.
func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value)
if repl, ok := s.mu.replicas.Load(rangeID); ok {
return repl
}
return nil
}
Expand Down Expand Up @@ -2459,8 +2459,8 @@ func (s *Store) getOverlappingKeyRangeLocked(
// RaftStatus returns the current raft status of the local replica of
// the given range.
func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value).RaftStatus()
if repl, ok := s.mu.replicas.Load(rangeID); ok {
return repl.RaftStatus()
}
return nil
}
Expand Down Expand Up @@ -2590,9 +2590,9 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
// performance critical code.
func (s *Store) ReplicaCount() int {
var count int
s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool {
_ = s.mu.replicas.Range(func(*Replica) error {
count++
return true
return nil
})
return count
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"context"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -80,8 +79,7 @@ func (s *Store) tryGetOrCreateReplica(
creatingReplica *roachpb.ReplicaDescriptor,
) (_ *Replica, created bool, _ error) {
// The common case: look up an existing (initialized) replica.
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
repl := (*Replica)(value)
if repl, ok := s.mu.replicas.Load(rangeID); ok {
repl.raftMu.Lock() // not unlocked on success
repl.mu.Lock()

Expand Down Expand Up @@ -298,7 +296,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error {
// same replica object. This occurs during splits where the right-hand side
// is added to the replicas map before it is initialized.
if existing, loaded := s.mu.replicas.LoadOrStore(
int64(repl.RangeID), unsafe.Pointer(repl)); loaded && (*Replica)(existing) != repl {
repl.RangeID, repl); loaded && existing != repl {
return errors.Errorf("%s: replica already exists", repl)
}
// Check whether the replica is unquiesced but not in the map. This
Expand All @@ -314,7 +312,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error {
}

// maybeMarkReplicaInitializedLocked should be called whenever a previously
// unintialized replica has become initialized so that the store can update its
// uninitialized replica has become initialized so that the store can update its
// internal bookkeeping. It requires that Store.mu and Replica.raftMu
// are locked.
func (s *Store) maybeMarkReplicaInitializedLockedReplLocked(
Expand Down
21 changes: 9 additions & 12 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
// forgiving.
//
// See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411.
if _, exists := s.mu.replicas.Load(int64(rangeID)); !exists {
if _, exists := s.mu.replicas.Load(rangeID); !exists {
q.Lock()
if len(q.infos) == 0 {
s.replicaQueues.Delete(int64(rangeID))
Expand All @@ -500,12 +500,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
}

func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
value, ok := s.mu.replicas.Load(int64(rangeID))
r, ok := s.mu.replicas.Load(rangeID)
if !ok {
return
}

r := (*Replica)(value)
ctx = r.raftSchedulerCtx(ctx)
start := timeutil.Now()
stats, expl, err := r.handleRaftReady(ctx, noSnap)
Expand All @@ -524,14 +523,13 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
}

func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool {
value, ok := s.mu.replicas.Load(int64(rangeID))
r, ok := s.mu.replicas.Load(rangeID)
if !ok {
return false
}
livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap)

start := timeutil.Now()
r := (*Replica)(value)
ctx = r.raftSchedulerCtx(ctx)
exists, err := r.tick(ctx, livenessMap)
if err != nil {
Expand Down Expand Up @@ -560,8 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool {
func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) {
s.updateLivenessMap()

s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
r := (*Replica)(v)
_ = s.mu.replicas.Range(func(r *Replica) error {
r.mu.RLock()
quiescent := r.mu.quiescent
lagging := r.mu.laggingFollowersOnQuiesce
Expand All @@ -570,7 +567,7 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) {
if quiescent && (lagging.MemberStale(l) || !laggingAccurate) {
r.unquiesce()
}
return true
return nil
})
}

Expand Down Expand Up @@ -730,13 +727,13 @@ func (s *Store) sendQueuedHeartbeatsToNode(

if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) {
for _, beat := range beats {
if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID)
if repl, ok := s.mu.replicas.Load(beat.RangeID); ok {
repl.addUnreachableRemoteReplica(beat.ToReplicaID)
}
}
for _, resp := range resps {
if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID)
if repl, ok := s.mu.replicas.Load(resp.RangeID); ok {
repl.addUnreachableRemoteReplica(resp.ToReplicaID)
}
}
return 0
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,10 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked(
defer s.mu.Unlock()

// Sanity check, could be removed.
value, stillExists := s.mu.replicas.Load(int64(rep.RangeID))
existing, stillExists := s.mu.replicas.Load(rep.RangeID)
if !stillExists {
log.Fatalf(ctx, "uninitialized replica was removed in the meantime")
}
existing := (*Replica)(value)
if existing == rep {
log.Infof(ctx, "removing uninitialized replica %v", rep)
} else {
Expand All @@ -264,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp
s.unquiescedReplicas.Unlock()
delete(s.mu.uninitReplicas, rangeID)
s.replicaQueues.Delete(int64(rangeID))
s.mu.replicas.Delete(int64(rangeID))
s.mu.replicas.Delete(rangeID)
s.unregisterLeaseholderByID(ctx, rangeID)
}

Expand Down
63 changes: 63 additions & 0 deletions pkg/kv/kvserver/store_replicas_by_rangeid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

type rangeIDReplicaMap struct {
m syncutil.IntMap
}

// Load loads the Replica for the RangeID. If not found, returns
// (nil, false), otherwise the Replica and true.
func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) {
val, ok := m.m.Load(int64(rangeID))
return (*Replica)(val), ok
}

// LoadOrStore loads the replica and returns it (and `true`). If it does not
// exist, atomically inserts the provided Replica and returns it along with
// `false`.
func (m *rangeIDReplicaMap) LoadOrStore(
rangeID roachpb.RangeID, repl *Replica,
) (_ *Replica, loaded bool) {
val, loaded := m.m.LoadOrStore(int64(rangeID), unsafe.Pointer(repl))
return (*Replica)(val), loaded
}

// Delete drops the Replica if it existed in the map.
func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) {
m.m.Delete(int64(rangeID))
}

// Range invokes the provided function with each Replica in the map.
// Iteration stops on any error. `iterutil.StopIteration()` can be
// returned from the closure to stop iteration without an error
// resulting from Range().
func (m *rangeIDReplicaMap) Range(f func(*Replica) error) error {
var err error
v := func(k int64, v unsafe.Pointer) (wantMore bool) {
err = f((*Replica)(v))
return err == nil
}
m.m.Range(v)
if errors.Is(err, iterutil.StopIteration()) {
return nil
}
return nil
}
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,10 @@ func (s *Store) canAcceptSnapshotLocked(
desc := *snapHeader.State.Desc

// First, check for an existing Replica.
v, ok := s.mu.replicas.Load(
int64(desc.RangeID),
)
existingRepl, ok := s.mu.replicas.Load(desc.RangeID)
if !ok {
return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present")
}
existingRepl := (*Replica)(v)
// The raftMu is held which allows us to use the existing replica as a
// placeholder when we decide that the snapshot can be applied. As long
// as the caller releases the raftMu only after feeding the snapshot
Expand Down

0 comments on commit 6256dd8

Please sign in to comment.