Skip to content

Commit

Permalink
Merge #61580
Browse files Browse the repository at this point in the history
61580: kvserver/closedts: optimize sidetransport closer r=andreimatei a=andreimatei

The sidetransport sender periodically loops through all the tracked
ranges and checks if it can advance their closedts. This patch makes the
hot loop a little more efficient by collapsing two operations that used
to take the replica lock separately: getting its descriptor and checking
the bump.

Also another small change - learners are considered among the followers
or a range that need closed ts communicated to them. There was no good
reason to exclude them, and including them is a bit more efficient.

Release note: None
Release justification: Improvement to very new functionality.

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Mar 10, 2021
2 parents 09832af + 0343413 commit c7f9851
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 32 deletions.
28 changes: 17 additions & 11 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ type Replica interface {
// Accessors.
StoreID() roachpb.StoreID
GetRangeID() roachpb.RangeID
Desc() *roachpb.RangeDescriptor

// BumpSideTransportClosed advances the range's closed timestamp if it can.
// If the closed timestamp is advanced, the function synchronizes with
Expand All @@ -135,11 +134,16 @@ type Replica interface {
//
// If the closed timestamp was advanced, the function returns a LAI to be
// attached to the newly closed timestamp.
//
// The desired closed timestamp is passed as a map from range policy to
// timestamp; this function looks up the entry for this range.
//
// The RangeDescriptor is returned in both the true and false cases.
BumpSideTransportClosed(
ctx context.Context,
now hlc.ClockTimestamp,
targetByPolicy [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy)
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy, *roachpb.RangeDescriptor)
}

// NewSender creates a Sender. Run must be called on it afterwards to get it to
Expand Down Expand Up @@ -332,17 +336,19 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
lhRangeID := lh.GetRangeID()
lastMsg, tracked := s.trackedMu.tracked[lhRangeID]

// Make sure that we're communicating with all of the range's followers.
// Note that we're including this range's followers before deciding below if
// this message will include this range. This is because we don't want
// dynamic conditions about the activity of this range to dictate the
// opening and closing of connections to the other nodes.
for _, repl := range lh.Desc().Replicas().VoterFullAndNonVoterDescriptors() {
nodesWithFollowers.Add(int(repl.NodeID))
// Check whether the desired timestamp can be closed on this range.
canClose, lai, policy, desc := lh.BumpSideTransportClosed(ctx, now, s.trackedMu.lastClosed)

// Ensure that we're communicating with all of the range's followers. Note
// that we're including this range's followers before deciding below if the
// current message will include this range; we don't want dynamic conditions
// about the activity of this range to dictate the opening and closing of
// connections to the other nodes.
repls := desc.Replicas().Descriptors()
for i := range repls {
nodesWithFollowers.Add(int(repls[i].NodeID))
}

// Check whether the desired timestamp can be closed on this range.
canClose, lai, policy := lh.BumpSideTransportClosed(ctx, now, s.trackedMu.lastClosed)
if !canClose {
// We can't close the desired timestamp. If this range was tracked, we
// need to un-track it.
Expand Down
11 changes: 4 additions & 7 deletions pkg/kv/kvserver/closedts/sidetransport/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,12 @@ var _ Replica = &mockReplica{}

func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID }
func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID }
func (m *mockReplica) Desc() *roachpb.RangeDescriptor {
m.mu.Lock()
defer m.mu.Unlock()
return &m.mu.desc
}
func (m *mockReplica) BumpSideTransportClosed(
_ context.Context, _ hlc.ClockTimestamp, _ [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy) {
return m.canBump, m.lai, m.policy
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy, *roachpb.RangeDescriptor) {
m.mu.Lock()
defer m.mu.Unlock()
return m.canBump, m.lai, m.policy, &m.mu.desc
}

func (m *mockReplica) removeReplica(nid roachpb.NodeID) {
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvserver/replica_closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,24 @@ func (r *Replica) EmitMLAI() {
// This is called by the closed timestamp side-transport. The desired closed timestamp
// is passed as a map from range policy to timestamp; this function looks up the entry
// for this range.
//
// The RangeDescriptor is returned in both the true and false cases.
func (r *Replica) BumpSideTransportClosed(
ctx context.Context,
now hlc.ClockTimestamp,
targetByPolicy [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
) (ok bool, _ ctpb.LAI, _ roachpb.RangeClosedTimestampPolicy) {
) (ok bool, _ ctpb.LAI, _ roachpb.RangeClosedTimestampPolicy, _ *roachpb.RangeDescriptor) {
r.mu.Lock()
defer r.mu.Unlock()
desc := r.descRLocked()

// This method can be called even after a Replica is destroyed and removed
// from the Store's replicas map, because unlinkReplicaByRangeIDLocked does
// not synchronize with sidetransport.Sender.publish, which maintains a
// local copy of its leaseholder map. To avoid issues resulting from this,
// we first check if the replica is destroyed.
if _, err := r.isDestroyedRLocked(); err != nil {
return false, 0, 0
return false, 0, 0, desc
}

lai := ctpb.LAI(r.mu.state.LeaseAppliedIndex)
Expand All @@ -104,17 +107,17 @@ func (r *Replica) BumpSideTransportClosed(
// matter.
valid := st.IsValid() || st.State == kvserverpb.LeaseState_UNUSABLE
if !valid || !st.OwnedBy(r.StoreID()) {
return false, 0, 0
return false, 0, 0, desc
}
if st.ClosedTimestampUpperBound().Less(target) {
return false, 0, 0
return false, 0, 0, desc
}

// If the range is merging into its left-hand neighbor, we can't close
// timestamps any more because the joint-range would not be aware of reads
// performed based on this advanced closed timestamp.
if r.mergeInProgressRLocked() {
return false, 0, 0
return false, 0, 0, desc
}

// If there are pending Raft proposals in-flight or committed entries that
Expand All @@ -130,20 +133,20 @@ func (r *Replica) BumpSideTransportClosed(
// proposals and their timestamps are still tracked in proposal buffer's
// tracker, and they'll be considered below.
if len(r.mu.proposals) > 0 || r.mu.applyingEntries {
return false, 0, 0
return false, 0, 0, desc
}

// MaybeForwardClosedLocked checks that there are no evaluating requests
// writing under target.
if !r.mu.proposalBuf.MaybeForwardClosedLocked(ctx, target) {
return false, 0, 0
return false, 0, 0, desc
}

// Update the replica directly since there's no side-transport connection to
// the local node.
r.mu.sideTransportClosedTimestamp = target
r.mu.sideTransportCloseTimestampLAI = lai
return true, lai, policy
return true, lai, policy, desc
}

// closedTimestampTargetRLocked computes the timestamp we'd like to close for
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(-1, 0)
return nil, nil, testutils.SucceedsSoonError(func() error {
ok, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
ok, _, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
Expand All @@ -276,7 +276,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target
return nil, nil, testutils.SucceedsSoonError(func() error {
ok, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
ok, _, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
Expand All @@ -293,7 +293,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(1, 0)
return nil, nil, testutils.SucceedsSoonError(func() error {
ok, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
ok, _, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
Expand Down Expand Up @@ -393,14 +393,14 @@ func TestBumpSideTransportClosed(t *testing.T) {
// would be a serious bug.
if exp {
testutils.SucceedsSoon(t, func() error {
ok, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
ok, _, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
return nil
})
} else {
ok, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
ok, _, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
require.False(t, ok)
}

Expand Down Expand Up @@ -462,7 +462,7 @@ func BenchmarkBumpSideTransportClosed(b *testing.B) {
targets[roachpb.LAG_BY_CLUSTER_SETTING] = now.ToTimestamp()

// Perform the call.
ok, _, _ := r.BumpSideTransportClosed(ctx, now, targets)
ok, _, _, _ := r.BumpSideTransportClosed(ctx, now, targets)
if !ok {
b.Fatal("BumpSideTransportClosed unexpectedly failed")
}
Expand Down

0 comments on commit c7f9851

Please sign in to comment.