Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: use wrapper type for Store.mu.replicas #72430

Merged
merged 3 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ go_library(
"store_rebalancer.go",
"store_remove_replica.go",
"store_replica_btree.go",
"store_replicas_by_rangeid.go",
"store_send.go",
"store_snapshot.go",
"store_split.go",
Expand Down
10 changes: 4 additions & 6 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"math/rand"
"testing"
"time"
"unsafe"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -515,21 +514,20 @@ func WriteRandomDataToRange(
}

func WatchForDisappearingReplicas(t testing.TB, store *Store) {
m := make(map[int64]struct{})
m := make(map[roachpb.RangeID]struct{})
for {
select {
case <-store.Stopper().ShouldQuiesce():
return
default:
}

store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
m[k] = struct{}{}
return true
store.mu.replicasByRangeID.Range(func(repl *Replica) {
m[repl.RangeID] = struct{}{}
})

for k := range m {
if _, ok := store.mu.replicas.Load(k); !ok {
if _, ok := store.mu.replicasByRangeID.Load(k); !ok {
t.Fatalf("r%d disappeared from Store.mu.replicas map", k)
}
}
Expand Down
18 changes: 8 additions & 10 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) {
// stale) view of all Replicas without holding the Store lock. In particular,
// no locks are acquired during the copy process.
rs.repls = nil
rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
rs.repls = append(rs.repls, (*Replica)(v))
return true
rs.store.mu.replicasByRangeID.Range(func(repl *Replica) {
rs.repls = append(rs.repls, repl)
})

if rs.ordered {
Expand Down Expand Up @@ -586,7 +585,7 @@ type Store struct {
syncutil.RWMutex
// Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This
// includes `uninitReplicas`. May be read without holding Store.mu.
replicas syncutil.IntMap
replicasByRangeID rangeIDReplicaMap
// A btree key containing objects of type *Replica or *ReplicaPlaceholder.
// Both types have an associated key range; the btree is keyed on their
// start keys.
Expand Down Expand Up @@ -2411,8 +2410,8 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) {

// GetReplicaIfExists returns the replica with the given RangeID or nil.
func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value)
if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok {
return repl
}
return nil
}
Expand Down Expand Up @@ -2459,8 +2458,8 @@ func (s *Store) getOverlappingKeyRangeLocked(
// RaftStatus returns the current raft status of the local replica of
// the given range.
func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value).RaftStatus()
if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok {
return repl.RaftStatus()
}
return nil
}
Expand Down Expand Up @@ -2590,9 +2589,8 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
// performance critical code.
func (s *Store) ReplicaCount() int {
var count int
s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool {
s.mu.replicasByRangeID.Range(func(*Replica) {
count++
return true
})
return count
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"context"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -80,8 +79,7 @@ func (s *Store) tryGetOrCreateReplica(
creatingReplica *roachpb.ReplicaDescriptor,
) (_ *Replica, created bool, _ error) {
// The common case: look up an existing (initialized) replica.
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
repl := (*Replica)(value)
if repl, ok := s.mu.replicasByRangeID.Load(rangeID); ok {
repl.raftMu.Lock() // not unlocked on success
repl.mu.Lock()

Expand Down Expand Up @@ -297,8 +295,8 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error {
// It's ok for the replica to exist in the replicas map as long as it is the
// same replica object. This occurs during splits where the right-hand side
// is added to the replicas map before it is initialized.
if existing, loaded := s.mu.replicas.LoadOrStore(
int64(repl.RangeID), unsafe.Pointer(repl)); loaded && (*Replica)(existing) != repl {
if existing, loaded := s.mu.replicasByRangeID.LoadOrStore(
repl.RangeID, repl); loaded && existing != repl {
return errors.Errorf("%s: replica already exists", repl)
}
// Check whether the replica is unquiesced but not in the map. This
Expand All @@ -314,7 +312,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error {
}

// maybeMarkReplicaInitializedLocked should be called whenever a previously
// unintialized replica has become initialized so that the store can update its
// uninitialized replica has become initialized so that the store can update its
// internal bookkeeping. It requires that Store.mu and Replica.raftMu
// are locked.
func (s *Store) maybeMarkReplicaInitializedLockedReplLocked(
Expand Down
20 changes: 8 additions & 12 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
// forgiving.
//
// See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411.
if _, exists := s.mu.replicas.Load(int64(rangeID)); !exists {
if _, exists := s.mu.replicasByRangeID.Load(rangeID); !exists {
q.Lock()
if len(q.infos) == 0 {
s.replicaQueues.Delete(int64(rangeID))
Expand All @@ -500,12 +500,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
}

func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
value, ok := s.mu.replicas.Load(int64(rangeID))
r, ok := s.mu.replicasByRangeID.Load(rangeID)
if !ok {
return
}

r := (*Replica)(value)
ctx = r.raftSchedulerCtx(ctx)
start := timeutil.Now()
stats, expl, err := r.handleRaftReady(ctx, noSnap)
Expand All @@ -524,14 +523,13 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
}

func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool {
value, ok := s.mu.replicas.Load(int64(rangeID))
r, ok := s.mu.replicasByRangeID.Load(rangeID)
if !ok {
return false
}
livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap)

start := timeutil.Now()
r := (*Replica)(value)
ctx = r.raftSchedulerCtx(ctx)
exists, err := r.tick(ctx, livenessMap)
if err != nil {
Expand Down Expand Up @@ -560,8 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool {
func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) {
s.updateLivenessMap()

s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
r := (*Replica)(v)
s.mu.replicasByRangeID.Range(func(r *Replica) {
r.mu.RLock()
quiescent := r.mu.quiescent
lagging := r.mu.laggingFollowersOnQuiesce
Expand All @@ -570,7 +567,6 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) {
if quiescent && (lagging.MemberStale(l) || !laggingAccurate) {
r.unquiesce()
}
return true
})
}

Expand Down Expand Up @@ -730,13 +726,13 @@ func (s *Store) sendQueuedHeartbeatsToNode(

if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) {
for _, beat := range beats {
if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID)
if repl, ok := s.mu.replicasByRangeID.Load(beat.RangeID); ok {
repl.addUnreachableRemoteReplica(beat.ToReplicaID)
}
}
for _, resp := range resps {
if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID)
if repl, ok := s.mu.replicasByRangeID.Load(resp.RangeID); ok {
repl.addUnreachableRemoteReplica(resp.ToReplicaID)
}
}
return 0
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,10 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked(
defer s.mu.Unlock()

// Sanity check, could be removed.
value, stillExists := s.mu.replicas.Load(int64(rep.RangeID))
existing, stillExists := s.mu.replicasByRangeID.Load(rep.RangeID)
if !stillExists {
log.Fatalf(ctx, "uninitialized replica was removed in the meantime")
}
existing := (*Replica)(value)
if existing == rep {
log.Infof(ctx, "removing uninitialized replica %v", rep)
} else {
Expand All @@ -264,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp
s.unquiescedReplicas.Unlock()
delete(s.mu.uninitReplicas, rangeID)
s.replicaQueues.Delete(int64(rangeID))
s.mu.replicas.Delete(int64(rangeID))
s.mu.replicasByRangeID.Delete(rangeID)
s.unregisterLeaseholderByID(ctx, rangeID)
}

Expand Down
51 changes: 51 additions & 0 deletions pkg/kv/kvserver/store_replicas_by_rangeid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

type rangeIDReplicaMap syncutil.IntMap

// Load loads the Replica for the RangeID. If not found, returns
// (nil, false), otherwise the Replica and true.
func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) {
val, ok := (*syncutil.IntMap)(m).Load(int64(rangeID))
return (*Replica)(val), ok
}

// LoadOrStore loads the replica and returns it (and `true`). If it does not
// exist, atomically inserts the provided Replica and returns it along with
// `false`.
func (m *rangeIDReplicaMap) LoadOrStore(
rangeID roachpb.RangeID, repl *Replica,
) (_ *Replica, loaded bool) {
val, loaded := (*syncutil.IntMap)(m).LoadOrStore(int64(rangeID), unsafe.Pointer(repl))
return (*Replica)(val), loaded
}

// Delete drops the Replica if it existed in the map.
func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) {
(*syncutil.IntMap)(m).Delete(int64(rangeID))
}

// Range invokes the provided function with each Replica in the map.
func (m *rangeIDReplicaMap) Range(f func(*Replica)) {
v := func(k int64, v unsafe.Pointer) bool {
f((*Replica)(v))
return true // wantMore
}
(*syncutil.IntMap)(m).Range(v)
}
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,10 @@ func (s *Store) canAcceptSnapshotLocked(
desc := *snapHeader.State.Desc

// First, check for an existing Replica.
v, ok := s.mu.replicas.Load(
int64(desc.RangeID),
)
existingRepl, ok := s.mu.replicasByRangeID.Load(desc.RangeID)
if !ok {
return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present")
}
existingRepl := (*Replica)(v)
// The raftMu is held which allows us to use the existing replica as a
// placeholder when we decide that the snapshot can be applied. As long
// as the caller releases the raftMu only after feeding the snapshot
Expand Down