Skip to content

Commit

Permalink
kvserver/closedts: optimize sidetransport closer
Browse files Browse the repository at this point in the history
The sidetransport sender periodically loops through all the tracked
ranges and checks if it can advance their closedts. This patch makes the
hot loop a little more efficient by collapsing two operations that used
to take the replica lock separately: getting its descriptor and checking
the bump.

Also another small change - learners are considered among the followers
or a range that need closed ts communicated to them. There was no good
reason to exclude them, and including them is a bit more efficient.

Release note: None
Release justification: Improvement to very new functionality.
  • Loading branch information
andreimatei committed Mar 10, 2021
1 parent 934b872 commit 0343413
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 32 deletions.
28 changes: 17 additions & 11 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ type Replica interface {
// Accessors.
StoreID() roachpb.StoreID
GetRangeID() roachpb.RangeID
Desc() *roachpb.RangeDescriptor

// BumpSideTransportClosed advances the range's closed timestamp if it can.
// If the closed timestamp is advanced, the function synchronizes with
Expand All @@ -135,11 +134,16 @@ type Replica interface {
//
// If the closed timestamp was advanced, the function returns a LAI to be
// attached to the newly closed timestamp.
//
// The desired closed timestamp is passed as a map from range policy to
// timestamp; this function looks up the entry for this range.
//
// The RangeDescriptor is returned in both the true and false cases.
BumpSideTransportClosed(
ctx context.Context,
now hlc.ClockTimestamp,
targetByPolicy [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy)
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy, *roachpb.RangeDescriptor)
}

// NewSender creates a Sender. Run must be called on it afterwards to get it to
Expand Down Expand Up @@ -332,17 +336,19 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
lhRangeID := lh.GetRangeID()
lastMsg, tracked := s.trackedMu.tracked[lhRangeID]

// Make sure that we're communicating with all of the range's followers.
// Note that we're including this range's followers before deciding below if
// this message will include this range. This is because we don't want
// dynamic conditions about the activity of this range to dictate the
// opening and closing of connections to the other nodes.
for _, repl := range lh.Desc().Replicas().VoterFullAndNonVoterDescriptors() {
nodesWithFollowers.Add(int(repl.NodeID))
// Check whether the desired timestamp can be closed on this range.
canClose, lai, policy, desc := lh.BumpSideTransportClosed(ctx, now, s.trackedMu.lastClosed)

// Ensure that we're communicating with all of the range's followers. Note
// that we're including this range's followers before deciding below if the
// current message will include this range; we don't want dynamic conditions
// about the activity of this range to dictate the opening and closing of
// connections to the other nodes.
repls := desc.Replicas().Descriptors()
for i := range repls {
nodesWithFollowers.Add(int(repls[i].NodeID))
}

// Check whether the desired timestamp can be closed on this range.
canClose, lai, policy := lh.BumpSideTransportClosed(ctx, now, s.trackedMu.lastClosed)
if !canClose {
// We can't close the desired timestamp. If this range was tracked, we
// need to un-track it.
Expand Down
11 changes: 4 additions & 7 deletions pkg/kv/kvserver/closedts/sidetransport/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,12 @@ var _ Replica = &mockReplica{}

func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID }
func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID }
func (m *mockReplica) Desc() *roachpb.RangeDescriptor {
m.mu.Lock()
defer m.mu.Unlock()
return &m.mu.desc
}
func (m *mockReplica) BumpSideTransportClosed(
_ context.Context, _ hlc.ClockTimestamp, _ [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy) {
return m.canBump, m.lai, m.policy
) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy, *roachpb.RangeDescriptor) {
m.mu.Lock()
defer m.mu.Unlock()
return m.canBump, m.lai, m.policy, &m.mu.desc
}

func (m *mockReplica) removeReplica(nid roachpb.NodeID) {
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvserver/replica_closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,24 @@ func (r *Replica) EmitMLAI() {
// This is called by the closed timestamp side-transport. The desired closed timestamp
// is passed as a map from range policy to timestamp; this function looks up the entry
// for this range.
//
// The RangeDescriptor is returned in both the true and false cases.
func (r *Replica) BumpSideTransportClosed(
ctx context.Context,
now hlc.ClockTimestamp,
targetByPolicy [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp,
) (ok bool, _ ctpb.LAI, _ roachpb.RangeClosedTimestampPolicy) {
) (ok bool, _ ctpb.LAI, _ roachpb.RangeClosedTimestampPolicy, _ *roachpb.RangeDescriptor) {
r.mu.Lock()
defer r.mu.Unlock()
desc := r.descRLocked()

// This method can be called even after a Replica is destroyed and removed
// from the Store's replicas map, because unlinkReplicaByRangeIDLocked does
// not synchronize with sidetransport.Sender.publish, which maintains a
// local copy of its leaseholder map. To avoid issues resulting from this,
// we first check if the replica is destroyed.
if _, err := r.isDestroyedRLocked(); err != nil {
return false, 0, 0
return false, 0, 0, desc
}

lai := ctpb.LAI(r.mu.state.LeaseAppliedIndex)
Expand All @@ -104,17 +107,17 @@ func (r *Replica) BumpSideTransportClosed(
// matter.
valid := st.IsValid() || st.State == kvserverpb.LeaseState_UNUSABLE
if !valid || !st.OwnedBy(r.StoreID()) {
return false, 0, 0
return false, 0, 0, desc
}
if st.ClosedTimestampUpperBound().Less(target) {
return false, 0, 0
return false, 0, 0, desc
}

// If the range is merging into its left-hand neighbor, we can't close
// timestamps any more because the joint-range would not be aware of reads
// performed based on this advanced closed timestamp.
if r.mergeInProgressRLocked() {
return false, 0, 0
return false, 0, 0, desc
}

// If there are pending Raft proposals in-flight or committed entries that
Expand All @@ -130,20 +133,20 @@ func (r *Replica) BumpSideTransportClosed(
// proposals and their timestamps are still tracked in proposal buffer's
// tracker, and they'll be considered below.
if len(r.mu.proposals) > 0 || r.mu.applyingEntries {
return false, 0, 0
return false, 0, 0, desc
}

// MaybeForwardClosedLocked checks that there are no evaluating requests
// writing under target.
if !r.mu.proposalBuf.MaybeForwardClosedLocked(ctx, target) {
return false, 0, 0
return false, 0, 0, desc
}

// Update the replica directly since there's no side-transport connection to
// the local node.
r.mu.sideTransportClosedTimestamp = target
r.mu.sideTransportCloseTimestampLAI = lai
return true, lai, policy
return true, lai, policy, desc
}

// closedTimestampTargetRLocked computes the timestamp we'd like to close for
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(-1, 0)
return nil, nil, testutils.SucceedsSoonError(func() error {
ok, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
ok, _, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
Expand All @@ -276,7 +276,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target
return nil, nil, testutils.SucceedsSoonError(func() error {
ok, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
ok, _, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
Expand All @@ -293,7 +293,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
var targets [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
targets[roachpb.LAG_BY_CLUSTER_SETTING] = a.target.Add(1, 0)
return nil, nil, testutils.SucceedsSoonError(func() error {
ok, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
ok, _, _, _ := a.repl.BumpSideTransportClosed(ctx, a.now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
Expand Down Expand Up @@ -393,14 +393,14 @@ func TestBumpSideTransportClosed(t *testing.T) {
// would be a serious bug.
if exp {
testutils.SucceedsSoon(t, func() error {
ok, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
ok, _, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
if !ok {
return errors.New("bumping side-transport unexpectedly failed")
}
return nil
})
} else {
ok, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
ok, _, _, _ := repl.BumpSideTransportClosed(ctx, now, targets)
require.False(t, ok)
}

Expand Down Expand Up @@ -462,7 +462,7 @@ func BenchmarkBumpSideTransportClosed(b *testing.B) {
targets[roachpb.LAG_BY_CLUSTER_SETTING] = now.ToTimestamp()

// Perform the call.
ok, _, _ := r.BumpSideTransportClosed(ctx, now, targets)
ok, _, _, _ := r.BumpSideTransportClosed(ctx, now, targets)
if !ok {
b.Fatal("BumpSideTransportClosed unexpectedly failed")
}
Expand Down

0 comments on commit 0343413

Please sign in to comment.