From 2820d6afff8017f1fa01d2e939328dd7e1535da0 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 22 Jul 2023 12:46:45 +0000 Subject: [PATCH 01/28] kvserver: remove stale mayCampaignOnWake comment The comment is about a parameter that no longer exists. Epic: none Release note: None --- pkg/kv/kvserver/replica_proposal_buf.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 832f18d77531..1a19cdfbbf40 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -1307,7 +1307,6 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { } func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { - // Pass true for mayCampaignOnWake because we're about to propose a command. return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader // if we were quiesced. However, we should make sure we are unquiesced. From bfc7f2e845c5024393d371312f87176acc63135f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 22 Jul 2023 14:01:52 +0000 Subject: [PATCH 02/28] kvserver: revamp shouldCampaign/Forget tests Epic: none Release note: None --- pkg/kv/kvserver/replica_test.go | 470 ++++++++++++++++++-------------- 1 file changed, 260 insertions(+), 210 deletions(-) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index b9d03fd4f86e..bd40d931d161 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11337,137 +11337,102 @@ func TestReplicaShouldCampaignOnWake(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - const storeID = roachpb.StoreID(1) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - { - ReplicaID: 1, - NodeID: 1, - StoreID: 1, - }, - { - ReplicaID: 2, - NodeID: 2, - StoreID: 2, - }, - { - ReplicaID: 3, - NodeID: 3, - StoreID: 3, - }, - }, - NextReplicaID: 4, - } - livenessMap := livenesspb.IsLiveMap{ - 1: livenesspb.IsLiveMapEntry{IsLive: true}, - 2: livenesspb.IsLiveMapEntry{IsLive: false}, - 4: livenesspb.IsLiveMapEntry{IsLive: false}, - } - - myLease := roachpb.Lease{ - Replica: roachpb.ReplicaDescriptor{ - StoreID: storeID, - }, - } - otherLease := roachpb.Lease{ - Replica: roachpb.ReplicaDescriptor{ - StoreID: roachpb.StoreID(2), - }, + type params struct { + leaseStatus kvserverpb.LeaseStatus + storeID roachpb.StoreID + raftStatus raft.BasicStatus + livenessMap livenesspb.IsLiveMap + desc *roachpb.RangeDescriptor + requiresExpirationLease bool } - followerWithoutLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 0, - }, - } - followerWithLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 1, - }, - } - candidate := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateCandidate, - Lead: 0, - }, - } - leader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateLeader, - Lead: 1, + // Set up a base state that we can vary, representing this node n1 being a + // follower of n2, but n2 is dead and does not hold a valid lease. We should + // campaign in this state. + base := params{ + storeID: 1, + desc: &roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + NextReplicaID: 4, }, - } - followerDeadLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 2, + leaseStatus: kvserverpb.LeaseStatus{ + Lease: roachpb.Lease{Replica: roachpb.ReplicaDescriptor{StoreID: 2}}, + State: kvserverpb.LeaseState_EXPIRED, }, - } - candidateDeadLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateCandidate, - Lead: 2, - }, - } - followerMissingLiveness := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 3, + raftStatus: raft.BasicStatus{ + SoftState: raft.SoftState{ + RaftState: raft.StateFollower, + Lead: 2, + }, }, - } - followerMissingDesc := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 4, + livenessMap: livenesspb.IsLiveMap{ + 1: livenesspb.IsLiveMapEntry{IsLive: true}, + 2: livenesspb.IsLiveMapEntry{IsLive: false}, + 3: livenesspb.IsLiveMapEntry{IsLive: false}, }, } - tests := []struct { - leaseStatus kvserverpb.LeaseStatus - raftStatus raft.BasicStatus - livenessMap livenesspb.IsLiveMap - desc *roachpb.RangeDescriptor - requiresExpirationLease bool - exp bool + testcases := map[string]struct { + expect bool + modify func(*params) }{ - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: myLease}, followerWithoutLeader, livenessMap, &desc, false, true}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: otherLease}, followerWithoutLeader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: myLease}, followerWithLeader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: otherLease}, followerWithLeader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: myLease}, candidate, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: otherLease}, candidate, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: myLease}, leader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_VALID, Lease: otherLease}, leader, livenessMap, &desc, false, false}, - - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: myLease}, followerWithoutLeader, livenessMap, &desc, false, true}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerWithoutLeader, livenessMap, &desc, false, true}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: myLease}, followerWithoutLeader, livenessMap, &desc, true, true}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerWithoutLeader, livenessMap, &desc, true, true}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: myLease}, followerWithLeader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerWithLeader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: myLease}, candidate, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, candidate, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: myLease}, leader, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, leader, livenessMap, &desc, false, false}, - - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerDeadLeader, livenessMap, &desc, false, true}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerDeadLeader, livenessMap, &desc, true, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerMissingLiveness, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, followerMissingDesc, livenessMap, &desc, false, false}, - {kvserverpb.LeaseStatus{State: kvserverpb.LeaseState_EXPIRED, Lease: otherLease}, candidateDeadLeader, livenessMap, &desc, false, false}, - } - - for i, test := range tests { - v := shouldCampaignOnWake(test.leaseStatus, storeID, test.raftStatus, test.livenessMap, test.desc, test.requiresExpirationLease) - if v != test.exp { - t.Errorf("%d: expected %v but got %v", i, test.exp, v) - } + "dead leader without lease": {true, func(p *params) {}}, + "valid remote lease": {false, func(p *params) { + p.leaseStatus.State = kvserverpb.LeaseState_VALID + }}, + "valid local lease": {true, func(p *params) { + p.leaseStatus.State = kvserverpb.LeaseState_VALID + p.leaseStatus.Lease.Replica.StoreID = 1 + }}, + "pre-candidate": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StatePreCandidate + p.raftStatus.Lead = raft.None + }}, + "candidate": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StateCandidate + p.raftStatus.Lead = raft.None + }}, + "leader": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StateLeader + p.raftStatus.Lead = 1 + }}, + "unknown leader": {true, func(p *params) { + p.raftStatus.Lead = raft.None + }}, + "requires expiration lease": {false, func(p *params) { + p.requiresExpirationLease = true + }}, + "leader not in desc": {false, func(p *params) { + p.raftStatus.Lead = 4 + }}, + "leader not in liveness": {false, func(p *params) { + delete(p.livenessMap, 2) + }}, + "leader is live": {false, func(p *params) { + l := p.livenessMap[2] + l.IsLive = true + p.livenessMap[2] = l + }}, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + p := base + p.livenessMap = livenesspb.IsLiveMap{} + for k, v := range base.livenessMap { + p.livenessMap[k] = v + } + tc.modify(&p) + require.Equal(t, tc.expect, shouldCampaignOnWake(p.leaseStatus, p.storeID, p.raftStatus, + p.livenessMap, p.desc, p.requiresExpirationLease)) + }) } } @@ -11475,109 +11440,194 @@ func TestReplicaShouldCampaignOnLeaseRequestRedirect(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - desc := roachpb.RangeDescriptor{ - RangeID: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - { - ReplicaID: 1, - NodeID: 1, - StoreID: 1, - }, - { - ReplicaID: 2, - NodeID: 2, - StoreID: 2, - }, - { - ReplicaID: 3, - NodeID: 3, - StoreID: 3, - }, - }, - NextReplicaID: 4, + type params struct { + raftStatus raft.BasicStatus + livenessMap livenesspb.IsLiveMap + desc *roachpb.RangeDescriptor + shouldUseExpirationLease bool + now hlc.Timestamp } - now := hlc.Timestamp{WallTime: 100} - livenessMap := livenesspb.IsLiveMap{ - 1: livenesspb.IsLiveMapEntry{ - IsLive: true, - Liveness: livenesspb.Liveness{Epoch: 1, Expiration: now.Add(1, 0).ToLegacyTimestamp()}, + // Set up a base state that we can vary, representing this node n1 being a + // follower of n2, but n2 is dead. We should campaign in this state. + base := params{ + desc: &roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + NextReplicaID: 4, }, - 2: livenesspb.IsLiveMapEntry{ - // NOTE: we purposefully set IsLive to true in disagreement with the - // Liveness expiration to ensure that we're only looking at node liveness - // in shouldCampaignOnLeaseRequestRedirect and not at whether this node is - // reachable from the local node. - IsLive: true, - Liveness: livenesspb.Liveness{Epoch: 1, Expiration: now.Add(-1, 0).ToLegacyTimestamp()}, + raftStatus: raft.BasicStatus{ + SoftState: raft.SoftState{ + RaftState: raft.StateFollower, + Lead: 2, + }, }, - } - - followerWithoutLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 0, + livenessMap: livenesspb.IsLiveMap{ + 1: livenesspb.IsLiveMapEntry{IsLive: true}, + 2: livenesspb.IsLiveMapEntry{IsLive: false}, + 3: livenesspb.IsLiveMapEntry{IsLive: false}, }, + now: hlc.Timestamp{Logical: 10}, } - followerWithLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 1, - }, + + testcases := map[string]struct { + expect bool + modify func(*params) + }{ + "dead leader": {true, func(p *params) {}}, + "pre-candidate": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StatePreCandidate + p.raftStatus.Lead = raft.None + }}, + "candidate": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StateCandidate + p.raftStatus.Lead = raft.None + }}, + "leader": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StateLeader + p.raftStatus.Lead = 1 + }}, + "unknown leader": {true, func(p *params) { + p.raftStatus.Lead = raft.None + }}, + "should use expiration lease": {false, func(p *params) { + p.shouldUseExpirationLease = true + }}, + "leader not in desc": {false, func(p *params) { + p.raftStatus.Lead = 4 + }}, + "leader not in liveness": {false, func(p *params) { + delete(p.livenessMap, 2) + }}, + "leader is live": {false, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: true, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, 1).ToLegacyTimestamp(), + }, + } + }}, + "leader is live according to expiration": {false, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: false, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, 1).ToLegacyTimestamp(), + }, + } + }}, + "leader is dead according to expiration": {true, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: true, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, -1).ToLegacyTimestamp(), + }, + } + }}, } - candidate := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateCandidate, - Lead: 0, - }, + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + p := base + p.livenessMap = livenesspb.IsLiveMap{} + for k, v := range base.livenessMap { + p.livenessMap[k] = v + } + tc.modify(&p) + require.Equal(t, tc.expect, shouldCampaignOnLeaseRequestRedirect( + p.raftStatus, p.livenessMap, p.desc, p.shouldUseExpirationLease, p.now)) + }) } - leader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateLeader, - Lead: 1, - }, +} + +func TestReplicaShouldForgetLeaderOnVoteRequest(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + type params struct { + raftStatus raft.BasicStatus + livenessMap livenesspb.IsLiveMap + desc *roachpb.RangeDescriptor } - followerDeadLeader := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 2, + + // Set up a base state that we can vary, representing this node n1 being a + // follower of n2, but n2 is dead. We should forget the leader in this state. + base := params{ + desc: &roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + NextReplicaID: 4, }, - } - followerMissingLiveness := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 3, + raftStatus: raft.BasicStatus{ + SoftState: raft.SoftState{ + RaftState: raft.StateFollower, + Lead: 2, + }, }, - } - followerMissingDesc := raft.BasicStatus{ - SoftState: raft.SoftState{ - RaftState: raft.StateFollower, - Lead: 4, + livenessMap: livenesspb.IsLiveMap{ + 1: livenesspb.IsLiveMapEntry{IsLive: true}, + 2: livenesspb.IsLiveMapEntry{IsLive: false}, + 3: livenesspb.IsLiveMapEntry{IsLive: false}, }, } - tests := []struct { - name string - raftStatus raft.BasicStatus - requiresExpirationLease bool - exp bool + testcases := map[string]struct { + expect bool + modify func(*params) }{ - {"candidate", candidate, false, false}, - {"leader", leader, false, false}, - {"follower without leader", followerWithoutLeader, false, true}, - {"follower unknown leader", followerMissingDesc, false, false}, - {"follower expiration-based lease", followerDeadLeader, true, false}, - {"follower unknown liveness leader", followerMissingLiveness, false, false}, - {"follower live leader", followerWithLeader, false, false}, - {"follower dead leader", followerDeadLeader, false, true}, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - v := shouldCampaignOnLeaseRequestRedirect(tc.raftStatus, livenessMap, &desc, tc.requiresExpirationLease, now) - require.Equal(t, tc.exp, v) + "dead leader": {true, func(p *params) {}}, + "pre-candidate": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StatePreCandidate + p.raftStatus.Lead = raft.None + }}, + "candidate": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StateCandidate + p.raftStatus.Lead = raft.None + }}, + "leader": {false, func(p *params) { + p.raftStatus.SoftState.RaftState = raft.StateLeader + p.raftStatus.Lead = 1 + }}, + "unknown leader": {false, func(p *params) { + p.raftStatus.Lead = raft.None + }}, + "leader not in desc": {true, func(p *params) { + p.raftStatus.Lead = 4 + }}, + "leader not in liveness": {true, func(p *params) { + delete(p.livenessMap, 2) + }}, + "leader is live": {false, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: true, + } + }}, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + p := base + p.livenessMap = livenesspb.IsLiveMap{} + for k, v := range base.livenessMap { + p.livenessMap[k] = v + } + tc.modify(&p) + require.Equal(t, tc.expect, shouldForgetLeaderOnVoteRequest( + p.raftStatus, p.livenessMap, p.desc)) }) } } From b74d3116944c2ad3ac777c88cc00a840e6bc0943 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 22 Jul 2023 14:35:57 +0000 Subject: [PATCH 03/28] kvserver: ignore RPC conn in `shouldCampaignOnWake` Previously, `shouldCampaignOnWake()` used `IsLiveMapEntry.IsLive` to determine whether the leader was dead. However, this not only depends on the node's liveness, but also its RPC connectivity. This can prevent an unquiescing replica from acquiring Raft leadership if the leader is still alive but unable to heartbeat liveness, and the leader will be unable to acquire epoch leases in this case. This patch ignores the RPC connection state when deciding whether to campaign, using only on the liveness state. Epic: none Release note: None --- pkg/kv/kvserver/client_raft_test.go | 3 --- pkg/kv/kvserver/replica_raft.go | 14 ++++++++++--- pkg/kv/kvserver/replica_test.go | 31 +++++++++++++++++++++++++---- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index bf8f4ca4b6c9..db4b08b5e434 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -6771,9 +6771,6 @@ func TestRaftPreVoteUnquiesceDeadLeader(t *testing.T) { Server: &server.TestingKnobs{ WallClock: manualClock, }, - Store: &kvserver.StoreTestingKnobs{ - DisableLivenessMapConnHealth: true, // to mark n1 as not live - }, }, }, }) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index dceb3024973b..e4a1008986a5 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -2042,6 +2042,7 @@ func shouldCampaignOnWake( livenessMap livenesspb.IsLiveMap, desc *roachpb.RangeDescriptor, requiresExpirationLease bool, + now hlc.Timestamp, ) bool { // When waking up a range, campaign unless we know that another // node holds a valid lease (this is most important after a split, @@ -2081,7 +2082,12 @@ func shouldCampaignOnWake( if !ok { return false } - return !livenessEntry.IsLive + // NB: we intentionally do not look at the IsLiveMapEntry.IsLive field, which + // accounts for whether the leader is reachable from this node (see + // Store.updateLivenessMap). We only care whether the leader is currently live + // according to node liveness because this determines whether it will be able + // to hold an epoch-based lease. + return !livenessEntry.Liveness.IsLive(now) } // maybeCampaignOnWakeLocked is called when the replica wakes from a quiesced @@ -2108,10 +2114,12 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { return } - leaseStatus := r.leaseStatusAtRLocked(ctx, r.store.Clock().NowAsClockTimestamp()) + now := r.store.Clock().NowAsClockTimestamp() + leaseStatus := r.leaseStatusAtRLocked(ctx, now) raftStatus := r.mu.internalRaftGroup.BasicStatus() livenessMap, _ := r.store.livenessMap.Load().(livenesspb.IsLiveMap) - if shouldCampaignOnWake(leaseStatus, r.store.StoreID(), raftStatus, livenessMap, r.descRLocked(), r.requiresExpirationLeaseRLocked()) { + if shouldCampaignOnWake(leaseStatus, r.store.StoreID(), raftStatus, livenessMap, r.descRLocked(), + r.requiresExpirationLeaseRLocked(), now.ToTimestamp()) { r.campaignLocked(ctx) } } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index bd40d931d161..26f6cae19bb7 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11344,6 +11344,7 @@ func TestReplicaShouldCampaignOnWake(t *testing.T) { livenessMap livenesspb.IsLiveMap desc *roachpb.RangeDescriptor requiresExpirationLease bool + now hlc.Timestamp } // Set up a base state that we can vary, representing this node n1 being a @@ -11416,9 +11417,31 @@ func TestReplicaShouldCampaignOnWake(t *testing.T) { delete(p.livenessMap, 2) }}, "leader is live": {false, func(p *params) { - l := p.livenessMap[2] - l.IsLive = true - p.livenessMap[2] = l + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: true, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, 1).ToLegacyTimestamp(), + }, + } + }}, + "leader is live according to expiration": {false, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: false, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, 1).ToLegacyTimestamp(), + }, + } + }}, + "leader is dead according to expiration": {true, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: true, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, -1).ToLegacyTimestamp(), + }, + } }}, } @@ -11431,7 +11454,7 @@ func TestReplicaShouldCampaignOnWake(t *testing.T) { } tc.modify(&p) require.Equal(t, tc.expect, shouldCampaignOnWake(p.leaseStatus, p.storeID, p.raftStatus, - p.livenessMap, p.desc, p.requiresExpirationLease)) + p.livenessMap, p.desc, p.requiresExpirationLease, p.now)) }) } } From cdde5f4fe608c977bcfb921633ff13f19d06fb9c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 22 Jul 2023 14:45:49 +0000 Subject: [PATCH 04/28] kvserver: ignore RPC conn in `shouldForgetLeaderOnVoteRequest` Previously, `shouldForgetLeaderOnVoteRequest()` used `IsLiveMapEntry.IsLive` to determine whether the leader was dead. However, this not only depends on the node's liveness, but also its RPC connectivity. This can prevent granting votes to a new leader that may be attempting to acquire a epoch lease (which the current leader can't). This patch ignores the RPC connection state when deciding whether to campaign, using only on the liveness state. Epic: none Release note: None --- pkg/kv/kvserver/replica_raft.go | 16 +++++++++++++--- pkg/kv/kvserver/replica_test.go | 26 +++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e4a1008986a5..82d43d97d8dc 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -2152,13 +2152,17 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { func (r *Replica) maybeForgetLeaderOnVoteRequestLocked() { raftStatus := r.mu.internalRaftGroup.BasicStatus() livenessMap, _ := r.store.livenessMap.Load().(livenesspb.IsLiveMap) - if shouldForgetLeaderOnVoteRequest(raftStatus, livenessMap, r.descRLocked()) { + now := r.store.Clock().Now() + if shouldForgetLeaderOnVoteRequest(raftStatus, livenessMap, r.descRLocked(), now) { r.forgetLeaderLocked(r.AnnotateCtx(context.TODO())) } } func shouldForgetLeaderOnVoteRequest( - raftStatus raft.BasicStatus, livenessMap livenesspb.IsLiveMap, desc *roachpb.RangeDescriptor, + raftStatus raft.BasicStatus, + livenessMap livenesspb.IsLiveMap, + desc *roachpb.RangeDescriptor, + now hlc.Timestamp, ) bool { // If we're not a follower with a leader, there's noone to forget. if raftStatus.RaftState != raft.StateFollower || raftStatus.Lead == raft.None { @@ -2179,7 +2183,13 @@ func shouldForgetLeaderOnVoteRequest( } // Forget the leader if it's no longer live. - return !livenessEntry.IsLive + // + // NB: we intentionally do not look at the IsLiveMapEntry.IsLive field, which + // accounts for whether the leader is reachable from this node (see + // Store.updateLivenessMap). We only care whether the leader is currently live + // according to node liveness because this determines whether it will be able + // to hold an epoch-based lease. + return !livenessEntry.Liveness.IsLive(now) } // shouldCampaignOnLeaseRequestRedirect returns whether a replica that is diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 26f6cae19bb7..da10bf53e20c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11579,6 +11579,7 @@ func TestReplicaShouldForgetLeaderOnVoteRequest(t *testing.T) { raftStatus raft.BasicStatus livenessMap livenesspb.IsLiveMap desc *roachpb.RangeDescriptor + now hlc.Timestamp } // Set up a base state that we can vary, representing this node n1 being a @@ -11606,6 +11607,7 @@ func TestReplicaShouldForgetLeaderOnVoteRequest(t *testing.T) { 2: livenesspb.IsLiveMapEntry{IsLive: false}, 3: livenesspb.IsLiveMapEntry{IsLive: false}, }, + now: hlc.Timestamp{Logical: 10}, } testcases := map[string]struct { @@ -11637,6 +11639,28 @@ func TestReplicaShouldForgetLeaderOnVoteRequest(t *testing.T) { "leader is live": {false, func(p *params) { p.livenessMap[2] = livenesspb.IsLiveMapEntry{ IsLive: true, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, 1).ToLegacyTimestamp(), + }, + } + }}, + "leader is live according to expiration": {false, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: false, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, 1).ToLegacyTimestamp(), + }, + } + }}, + "leader is dead according to expiration": {true, func(p *params) { + p.livenessMap[2] = livenesspb.IsLiveMapEntry{ + IsLive: true, + Liveness: livenesspb.Liveness{ + NodeID: 2, + Expiration: p.now.Add(0, -1).ToLegacyTimestamp(), + }, } }}, } @@ -11650,7 +11674,7 @@ func TestReplicaShouldForgetLeaderOnVoteRequest(t *testing.T) { } tc.modify(&p) require.Equal(t, tc.expect, shouldForgetLeaderOnVoteRequest( - p.raftStatus, p.livenessMap, p.desc)) + p.raftStatus, p.livenessMap, p.desc, p.now)) }) } } From 02e999d31f6a13b1babd0224c23cf48a5cb87845 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 22 Jul 2023 14:40:52 +0000 Subject: [PATCH 05/28] kvserver: remove `StoreTestingKnobs.DisableLivenessMapConnHealth` Epic: none Release note: None --- pkg/kv/kvserver/store_raft.go | 3 +-- pkg/kv/kvserver/testing_knobs.go | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 0c347ba7e020..90d09b0dddc6 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -859,8 +859,7 @@ func (s *Store) updateLivenessMap() { // will continually probe the connection. The check can also have false // positives if the node goes down after populating the map, but that // matters even less. - entry.IsLive = !s.TestingKnobs().DisableLivenessMapConnHealth && - (s.cfg.NodeDialer.ConnHealth(nodeID, rpc.SystemClass) == nil) + entry.IsLive = s.cfg.NodeDialer.ConnHealth(nodeID, rpc.SystemClass) == nil nextMap[nodeID] = entry } s.livenessMap.Store(nextMap) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index b1c25d3efcb9..63b2423fa341 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -251,10 +251,6 @@ type StoreTestingKnobs struct { RefreshReasonTicksPeriod int // DisableProcessRaft disables the process raft loop. DisableProcessRaft func(roachpb.StoreID) bool - // DisableLivenessMapConnHealth disables the ConnHealth check in - // updateIsLiveMap, which is useful in tests where we manipulate the node's - // liveness record but still keep the connection alive. - DisableLivenessMapConnHealth bool // DisableLastProcessedCheck disables checking on replica queue last processed times. DisableLastProcessedCheck bool // ReplicateQueueAcceptsUnsplit allows the replication queue to From a6babc04972e2829e89da4999c810bff881510c0 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 14:59:05 +0200 Subject: [PATCH 06/28] kvserver: mute cross-region/zone errors more They're too verbose and the metrics will already make it clear when things aren't set up properly. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 8 ++++---- pkg/kv/kvserver/store_snapshot.go | 4 ++-- pkg/server/node.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 053c14636d0d..abedd1e2bbe3 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2537,21 +2537,21 @@ func (ds *DistSender) getLocalityComparison( ) roachpb.LocalityComparisonType { gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) + log.VEventf(ctx, 5, "failed to perform look up for node descriptor %v", err) return roachpb.LocalityComparisonType_UNDEFINED } destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) + log.VEventf(ctx, 5, "failed to perform look up for node descriptor %v", err) return roachpb.LocalityComparisonType_UNDEFINED } comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) + log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) + log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) } return comparisonResult } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 2c4f268cd04a..3c52af6b4b51 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -990,10 +990,10 @@ func (s *Store) getLocalityComparison( secLocality := s.cfg.StorePool.GetNodeLocality(toNodeID) comparisonResult, regionErr, zoneErr := firstLocality.CompareWithLocality(secLocality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) + log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) + log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) } return comparisonResult } diff --git a/pkg/server/node.go b/pkg/server/node.go index d391554e63e2..0e9c3722b347 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1366,10 +1366,10 @@ func (n *Node) getLocalityComparison( comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) + log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) + log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) } return comparisonResult From 29a28710516aa7d06308457e22d2e36743c1ec3c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 14:58:15 +0200 Subject: [PATCH 07/28] kvserver: throttle logging on failed self-heartbeat This can be triggered rapidly because each replica might call this as it tries and fails to acquire a lease. --- pkg/kv/kvserver/replica_range_lease.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 5ad110c15a2c..2d42a62d289e 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -445,6 +445,8 @@ func (p *pendingLeaseRequest) requestLeaseAsync( return nil } +var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second) + // requestLease sends a synchronous transfer lease or lease request to the // specified replica. It is only meant to be called from requestLeaseAsync, // since it does not coordinate with other in-flight lease requests. @@ -465,9 +467,8 @@ func (p *pendingLeaseRequest) requestLease( if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED { var err error // If this replica is previous & next lease holder, manually heartbeat to become live. - if status.OwnedBy(nextLeaseHolder.StoreID) && - p.repl.store.StoreID() == nextLeaseHolder.StoreID { - if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil { + if status.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID { + if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() { log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) } } else if status.Liveness.Epoch == status.Lease.Epoch { From 51ee6855e68a5f88fe42ca0bacc5563c5512bab9 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 14:57:37 +0200 Subject: [PATCH 08/28] kvserver: log when campaigning on rejected lease --- pkg/kv/kvserver/replica_proposal_buf.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 832f18d77531..5964a542bdad 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -15,6 +15,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" @@ -620,6 +621,8 @@ func (b *propBuf) FlushLockedWithRaftGroup( return used, propErr } +var logCampaignOnRejectLease = log.Every(10 * time.Second) + // maybeRejectUnsafeProposalLocked conditionally rejects proposals that are // deemed unsafe, given the current state of the raft group. Requests that may // be deemed unsafe and rejected at this level are those whose safety has some @@ -702,7 +705,12 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked( li.leader) b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader) if b.p.shouldCampaignOnRedirect(raftGroup) { - log.VEventf(ctx, 2, "campaigning because Raft leader not live in node liveness map") + const format = "campaigning because Raft leader (id=%d) not live in node liveness map" + if logCampaignOnRejectLease.ShouldLog() { + log.Infof(ctx, format, li.leader) + } else { + log.VEventf(ctx, 2, format, li.leader) + } b.p.campaignLocked(ctx) } return true From c884561fec30c242c909f3f78e9cc0d3e9c0c5e2 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 13:22:32 +0200 Subject: [PATCH 09/28] kvserver: improve test logging --- pkg/kv/kvserver/client_raft_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index bf8f4ca4b6c9..907cf44f5b68 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1435,17 +1435,18 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) { atomic.StoreInt32(&installPartition, 1) // Wait until the lease expires. - log.Infof(ctx, "test: waiting for lease expiration") + log.Infof(ctx, "test: waiting for lease expiration on r%d", store0Repl.RangeID) testutils.SucceedsSoon(t, func() error { leaseStatus = store0Repl.CurrentLeaseStatus(ctx) if leaseStatus.IsValid() { - return errors.New("lease still valid") + return errors.Errorf("lease still valid: %+v", leaseStatus) } return nil }) log.Infof(ctx, "test: lease expired") { + tBegin := timeutil.Now() // Increment the initial value again, which requires range availability. To // get there, the request will need to trigger a lease request on a follower // replica, which will call a Raft election, acquire Raft leadership, then @@ -1454,7 +1455,7 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) { require.NoError(t, err) log.Infof(ctx, "test: waiting for new lease...") tc.WaitForValues(t, key, []int64{2, 2, 2, 0}) - log.Infof(ctx, "test: waiting for new lease... done") + log.Infof(ctx, "test: waiting for new lease... done [%.2fs]", timeutil.Since(tBegin).Seconds()) } // Store 0 no longer holds the lease. From d22a918d6af8319116ab03e0a052321a2691619e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 14:44:56 +0200 Subject: [PATCH 10/28] kvserver: fix a log line in a test The `Inc` is the blocking part, so log before. --- pkg/kv/kvserver/client_raft_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 907cf44f5b68..90bb31dcb8cc 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1451,9 +1451,9 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) { // get there, the request will need to trigger a lease request on a follower // replica, which will call a Raft election, acquire Raft leadership, then // acquire the range lease. + log.Infof(ctx, "test: waiting for new lease...") _, err := tc.Server(0).DB().Inc(ctx, key, 1) require.NoError(t, err) - log.Infof(ctx, "test: waiting for new lease...") tc.WaitForValues(t, key, []int64{2, 2, 2, 0}) log.Infof(ctx, "test: waiting for new lease... done [%.2fs]", timeutil.Since(tBegin).Seconds()) } From 04d368269edbef0f1d9184395e6cbfb98f8b1ab2 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 14:32:50 +0200 Subject: [PATCH 11/28] kvserver: use hybrid manual clock in TestRequestsOnFollowerWithNonLiveLeaseholder The test previously relied on aggressive liveness heartbeat expirations to avoid running for too long. As a result, it was flaky since liveness wasn't reliably pinned in the way the test wanted. The hybrid manual clock allows time to jump forward at an opportune moment. Use it here to avoid running with a tight lease interval. On my gceworker, previously flaked within a few minutes. As of this commit, I ran it for double-digit minutes without issue. Fixes #107200. Epic: None Release note: None --- pkg/kv/kvserver/client_raft_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 90bb31dcb8cc..fc129d3860e0 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1350,6 +1350,7 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) { st := cluster.MakeTestingClusterSettings() kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism + manualClock := hlc.NewHybridManualClock() clusterArgs := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ @@ -1361,12 +1362,8 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) { RaftEnableCheckQuorum: true, }, Knobs: base.TestingKnobs{ - NodeLiveness: kvserver.NodeLivenessTestingKnobs{ - // This test waits for an epoch-based lease to expire, so we're - // setting the liveness duration as low as possible while still - // keeping the test stable. - LivenessDuration: 3000 * time.Millisecond, - RenewalDuration: 1500 * time.Millisecond, + Server: &server.TestingKnobs{ + WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ // We eliminate clock offsets in order to eliminate the stasis period @@ -1437,7 +1434,16 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) { // Wait until the lease expires. log.Infof(ctx, "test: waiting for lease expiration on r%d", store0Repl.RangeID) testutils.SucceedsSoon(t, func() error { + dur, _ := store0.GetStoreConfig().NodeLivenessDurations() + manualClock.Increment(dur.Nanoseconds()) leaseStatus = store0Repl.CurrentLeaseStatus(ctx) + // If we failed to pin the lease, it likely won't ever expire due to the particular + // partition we've set up. Bail early instead of wasting 45s. + require.True(t, leaseStatus.Lease.OwnedBy(store0.StoreID()), "failed to pin lease") + + // Lease is on s1, and it should be invalid now. The only reason there's a + // retry loop is that there could be a race where we bump the clock while a + // heartbeat is inflight (and which picks up the new expiration). if leaseStatus.IsValid() { return errors.Errorf("lease still valid: %+v", leaseStatus) } From 7c2b6eac36867cf29cc976b0f708edefaf3cc516 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 17:53:51 +0200 Subject: [PATCH 12/28] kvserver: avoid implicit nils in TestReplicaLeaseCounters The two removed fields are nil. This made a test failure during the refactors in this PR more annoying. If we're going to set up a half-inited NodeLiveness, let's at least be honest about it. --- pkg/kv/kvserver/replica_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 7868d266a823..efe37c3c5267 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1060,9 +1060,7 @@ func TestReplicaLeaseCounters(t *testing.T) { AmbientCtx: log.AmbientContext{}, Stopper: stopper, Settings: cfg.Settings, - Gossip: cfg.Gossip, Clock: cfg.Clock, - DB: cfg.DB, LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Engines: []storage.Engine{}, From 708435a053cbbe16a70cba65f7a0f0ce92c87d8a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 10:58:06 +0200 Subject: [PATCH 13/28] gossip: add GetNodeID accessor --- pkg/gossip/gossip.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index a59db6f1e643..480410606ecb 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -353,6 +353,11 @@ func (g *Gossip) AssertNotStarted(ctx context.Context) { } } +// GetNodeID gets the NodeID. +func (g *Gossip) GetNodeID() roachpb.NodeID { + return g.NodeID.Get() +} + // GetNodeMetrics returns the gossip node metrics. func (g *Gossip) GetNodeMetrics() *Metrics { return g.server.GetNodeMetrics() From 7d32c4fd458f824046856cde1d3c3fbacfb487f8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 11:03:58 +0200 Subject: [PATCH 14/28] liveness: add and adopt Gossip interface This will help with testing. --- pkg/kv/kvserver/liveness/cache.go | 12 +++++++++--- pkg/kv/kvserver/liveness/liveness.go | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/liveness/cache.go b/pkg/kv/kvserver/liveness/cache.go index 9260fe29aefe..b7cce1e1392e 100644 --- a/pkg/kv/kvserver/liveness/cache.go +++ b/pkg/kv/kvserver/liveness/cache.go @@ -28,6 +28,12 @@ type UpdateInfo struct { lastUnavailableTime hlc.Timestamp } +// Gossip is the subset of *gossip.Gossip used by liveness. +type Gossip interface { + RegisterCallback(pattern string, method gossip.Callback, opts ...gossip.CallbackOption) func() + GetNodeID() roachpb.NodeID +} + // cache stores updates to both Liveness records and the store descriptor map. // It doesn't store the entire StoreDescriptor, only the time when it is // updated. The StoreDescriptor is sent directly from nodes so doesn't require @@ -38,7 +44,7 @@ type UpdateInfo struct { // change to take this into account. Only epoch leases will use the liveness // timestamp directly. type cache struct { - gossip *gossip.Gossip + gossip Gossip clock *hlc.Clock notifyLivenessChanged func(old, new livenesspb.Liveness) mu struct { @@ -54,7 +60,7 @@ type cache struct { } func newCache( - g *gossip.Gossip, clock *hlc.Clock, cbFn func(livenesspb.Liveness, livenesspb.Liveness), + g Gossip, clock *hlc.Clock, cbFn func(livenesspb.Liveness, livenesspb.Liveness), ) *cache { c := cache{} c.gossip = g @@ -86,7 +92,7 @@ func newCache( // selfID returns the ID for this node according to Gossip. This will be 0 // until the node has joined the cluster. func (c *cache) selfID() roachpb.NodeID { - return c.gossip.NodeID.Get() + return c.gossip.GetNodeID() } // livenessGossipUpdate is the gossip callback used to keep the diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index a30e65ae0078..ae16bcb979e3 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -316,7 +316,7 @@ type NodeLivenessOptions struct { AmbientCtx log.AmbientContext Stopper *stop.Stopper Settings *cluster.Settings - Gossip *gossip.Gossip + Gossip Gossip Clock *hlc.Clock DB *kv.DB LivenessThreshold time.Duration From 31b9ec423783ca849a7ea18992b06fa8dfb09bfb Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 11:13:29 +0200 Subject: [PATCH 15/28] liveness: rename storage -> storageImpl We'll give this a proper interface soon. --- pkg/kv/kvserver/liveness/liveness.go | 4 ++-- pkg/kv/kvserver/liveness/storage.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index ae16bcb979e3..84c14347fabc 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -262,7 +262,7 @@ type NodeLiveness struct { ambientCtx log.AmbientContext stopper *stop.Stopper clock *hlc.Clock - storage storage + storage storageImpl livenessThreshold time.Duration cache *cache renewalDuration time.Duration @@ -342,7 +342,7 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { ambientCtx: opts.AmbientCtx, stopper: opts.Stopper, clock: opts.Clock, - storage: storage{db: opts.DB}, + storage: storageImpl{db: opts.DB}, livenessThreshold: opts.LivenessThreshold, renewalDuration: opts.RenewalDuration, selfSem: make(chan struct{}, 1), diff --git a/pkg/kv/kvserver/liveness/storage.go b/pkg/kv/kvserver/liveness/storage.go index 63cf91f4b0b7..12d485b98dec 100644 --- a/pkg/kv/kvserver/liveness/storage.go +++ b/pkg/kv/kvserver/liveness/storage.go @@ -24,9 +24,9 @@ import ( "github.com/cockroachdb/errors" ) -// storage is the subset of liveness that deals with reading and writing the +// storageImpl is the subset of liveness that deals with reading and writing the // liveness records to kv. All calls to modify liveness are centrialized here. -type storage struct { +type storageImpl struct { db *kv.DB } @@ -51,7 +51,7 @@ type livenessUpdate struct { // typically unnecessary. For updating liveness, it is still not necessary to // call get, since the handleCondFailed after a CPut will notify you of the // previous data. -func (ls storage) get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { +func (ls storageImpl) get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { var oldLiveness livenesspb.Liveness record, err := ls.db.Get(ctx, keys.NodeLivenessKey(nodeID)) if err != nil { @@ -75,7 +75,7 @@ func (ls storage) get(ctx context.Context, nodeID roachpb.NodeID) (Record, error // handleCondFailed func is called with the current data stored for this node. // This method does not retry, but normally the caller will retry using the // returned value on a condition failure. -func (ls storage) update( +func (ls storageImpl) update( ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { var v *roachpb.Value @@ -132,7 +132,7 @@ func (ls storage) update( // // NB: An existing liveness record is not overwritten by this method, we return // an error instead. -func (ls storage) create(ctx context.Context, nodeID roachpb.NodeID) error { +func (ls storageImpl) create(ctx context.Context, nodeID roachpb.NodeID) error { for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { // We start off at epoch=0, entrusting the initial heartbeat to increment it // to epoch=1 to signal the very first time the node is up and running. @@ -180,7 +180,7 @@ func (ls storage) create(ctx context.Context, nodeID roachpb.NodeID) error { } // scan will iterate over the KV liveness names and generate liveness records from them. -func (ls storage) scan(ctx context.Context) ([]Record, error) { +func (ls storageImpl) scan(ctx context.Context) ([]Record, error) { kvs, err := ls.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0) if err != nil { return nil, errors.Wrap(err, "unable to get liveness") From beb2c9badc102d84e41ccee25aca4b5eef30de56 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 11:16:19 +0200 Subject: [PATCH 16/28] liveness: export storage methods So that it can implement a public interface. --- pkg/kv/kvserver/liveness/liveness.go | 8 ++++---- pkg/kv/kvserver/liveness/storage.go | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 84c14347fabc..0754804f6c80 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -580,7 +580,7 @@ func (nl *NodeLiveness) cacheUpdated(old livenesspb.Liveness, new livenesspb.Liv // NB: An existing liveness record is not overwritten by this method, we return // an error instead. func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb.NodeID) error { - return nl.storage.create(ctx, nodeID) + return nl.storage.Create(ctx, nodeID) } func (nl *NodeLiveness) setMembershipStatusInternal( @@ -923,7 +923,7 @@ func (nl *NodeLiveness) ScanNodeVitalityFromCache() livenesspb.NodeVitalityMap { func (nl *NodeLiveness) ScanNodeVitalityFromKV( ctx context.Context, ) (livenesspb.NodeVitalityMap, error) { - records, err := nl.storage.scan(ctx) + records, err := nl.storage.Scan(ctx) if err != nil { return nil, err } @@ -993,7 +993,7 @@ func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (_ Record, ok bool) { func (nl *NodeLiveness) getLivenessRecordFromKV( ctx context.Context, nodeID roachpb.NodeID, ) (Record, error) { - livenessRec, err := nl.storage.get(ctx, nodeID) + livenessRec, err := nl.storage.Get(ctx, nodeID) if err == nil { // Update our cache with the liveness record we just found. nl.cache.maybeUpdate(ctx, livenessRec) @@ -1182,7 +1182,7 @@ func (nl *NodeLiveness) updateLivenessAttempt( } update.oldRaw = l.raw } - return nl.storage.update(ctx, update, handleCondFailed) + return nl.storage.Update(ctx, update, handleCondFailed) } // numLiveNodes is used to populate a metric that tracks the number of live diff --git a/pkg/kv/kvserver/liveness/storage.go b/pkg/kv/kvserver/liveness/storage.go index 12d485b98dec..15b64fcb7d88 100644 --- a/pkg/kv/kvserver/liveness/storage.go +++ b/pkg/kv/kvserver/liveness/storage.go @@ -42,7 +42,7 @@ type livenessUpdate struct { oldRaw []byte } -// get returns a slice containing the liveness record of all nodes that have +// Get returns a slice containing the liveness record of all nodes that have // ever been a part of the cluster. The records are read from the KV layer in a // KV transaction. // @@ -51,7 +51,7 @@ type livenessUpdate struct { // typically unnecessary. For updating liveness, it is still not necessary to // call get, since the handleCondFailed after a CPut will notify you of the // previous data. -func (ls storageImpl) get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { +func (ls storageImpl) Get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { var oldLiveness livenesspb.Liveness record, err := ls.db.Get(ctx, keys.NodeLivenessKey(nodeID)) if err != nil { @@ -70,12 +70,12 @@ func (ls storageImpl) get(ctx context.Context, nodeID roachpb.NodeID) (Record, e }, nil } -// update will attempt to update the liveness record using a CPut with the +// Update will attempt to update the liveness record using a CPut with the // oldRaw from the livenessUpdate. If the oldRaw does not match, the // handleCondFailed func is called with the current data stored for this node. // This method does not retry, but normally the caller will retry using the // returned value on a condition failure. -func (ls storageImpl) update( +func (ls storageImpl) Update( ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { var v *roachpb.Value @@ -126,13 +126,13 @@ func (ls storageImpl) update( return Record{Liveness: update.newLiveness, raw: v.TagAndDataBytes()}, nil } -// create creates a liveness record for the node specified by the +// Create creates a liveness record for the node specified by the // given node ID. This is typically used when adding a new node to a running // cluster, or when bootstrapping a cluster through a given node. // // NB: An existing liveness record is not overwritten by this method, we return // an error instead. -func (ls storageImpl) create(ctx context.Context, nodeID roachpb.NodeID) error { +func (ls storageImpl) Create(ctx context.Context, nodeID roachpb.NodeID) error { for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { // We start off at epoch=0, entrusting the initial heartbeat to increment it // to epoch=1 to signal the very first time the node is up and running. @@ -179,8 +179,8 @@ func (ls storageImpl) create(ctx context.Context, nodeID roachpb.NodeID) error { return errors.AssertionFailedf("unexpected problem while creating liveness record for node %d", nodeID) } -// scan will iterate over the KV liveness names and generate liveness records from them. -func (ls storageImpl) scan(ctx context.Context) ([]Record, error) { +// Scan will iterate over the KV liveness names and generate liveness records from them. +func (ls storageImpl) Scan(ctx context.Context) ([]Record, error) { kvs, err := ls.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0) if err != nil { return nil, errors.Wrap(err, "unable to get liveness") From ea7cc945254062498a6858553a5828810a828501 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 11:18:30 +0200 Subject: [PATCH 17/28] liveness: switch storage to ptr receivers --- pkg/kv/kvserver/liveness/storage.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/liveness/storage.go b/pkg/kv/kvserver/liveness/storage.go index 15b64fcb7d88..86fc9fe7feef 100644 --- a/pkg/kv/kvserver/liveness/storage.go +++ b/pkg/kv/kvserver/liveness/storage.go @@ -51,7 +51,7 @@ type livenessUpdate struct { // typically unnecessary. For updating liveness, it is still not necessary to // call get, since the handleCondFailed after a CPut will notify you of the // previous data. -func (ls storageImpl) Get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { +func (ls *storageImpl) Get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { var oldLiveness livenesspb.Liveness record, err := ls.db.Get(ctx, keys.NodeLivenessKey(nodeID)) if err != nil { @@ -75,7 +75,7 @@ func (ls storageImpl) Get(ctx context.Context, nodeID roachpb.NodeID) (Record, e // handleCondFailed func is called with the current data stored for this node. // This method does not retry, but normally the caller will retry using the // returned value on a condition failure. -func (ls storageImpl) Update( +func (ls *storageImpl) Update( ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { var v *roachpb.Value @@ -132,7 +132,7 @@ func (ls storageImpl) Update( // // NB: An existing liveness record is not overwritten by this method, we return // an error instead. -func (ls storageImpl) Create(ctx context.Context, nodeID roachpb.NodeID) error { +func (ls *storageImpl) Create(ctx context.Context, nodeID roachpb.NodeID) error { for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { // We start off at epoch=0, entrusting the initial heartbeat to increment it // to epoch=1 to signal the very first time the node is up and running. @@ -180,7 +180,7 @@ func (ls storageImpl) Create(ctx context.Context, nodeID roachpb.NodeID) error { } // Scan will iterate over the KV liveness names and generate liveness records from them. -func (ls storageImpl) Scan(ctx context.Context) ([]Record, error) { +func (ls *storageImpl) Scan(ctx context.Context) ([]Record, error) { kvs, err := ls.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0) if err != nil { return nil, errors.Wrap(err, "unable to get liveness") From 52792fce02fde289993f7d230ac1281be7ea18f6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 17:41:13 +0200 Subject: [PATCH 18/28] liveness: export LivenessUpdate It's needed to implement the liveness storage (once it exists). --- pkg/kv/kvserver/liveness/liveness.go | 12 ++++++------ pkg/kv/kvserver/liveness/storage.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 0754804f6c80..3416cf51192c 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -514,7 +514,7 @@ func (nl *NodeLiveness) setDrainingInternal( } newLiveness.Draining = drain - update := livenessUpdate{ + update := LivenessUpdate{ oldLiveness: oldLivenessRec.Liveness, newLiveness: newLiveness, oldRaw: oldLivenessRec.raw, @@ -595,7 +595,7 @@ func (nl *NodeLiveness) setMembershipStatusInternal( newLiveness := oldLivenessRec.Liveness newLiveness.Membership = targetStatus - update := livenessUpdate{ + update := LivenessUpdate{ newLiveness: newLiveness, oldLiveness: oldLivenessRec.Liveness, oldRaw: oldLivenessRec.raw, @@ -825,7 +825,7 @@ func (nl *NodeLiveness) heartbeatInternal( return errors.Errorf("proposed liveness update expires earlier than previous record") } - update := livenessUpdate{ + update := LivenessUpdate{ oldLiveness: oldLiveness, newLiveness: newLiveness, } @@ -1042,7 +1042,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness livenesspb. return errors.Errorf("cannot increment epoch on live node: %+v", liveness) } - update := livenessUpdate{ + update := LivenessUpdate{ newLiveness: liveness, oldLiveness: liveness, } @@ -1104,7 +1104,7 @@ func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { // This includes the encoded bytes, and it can be used to update the local // cache. func (nl *NodeLiveness) updateLiveness( - ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, + ctx context.Context, update LivenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { if err := nl.verifyDiskHealth(ctx); err != nil { return Record{}, err @@ -1164,7 +1164,7 @@ func (nl *NodeLiveness) verifyDiskHealth(ctx context.Context) error { } func (nl *NodeLiveness) updateLivenessAttempt( - ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, + ctx context.Context, update LivenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { // If the caller is not manually providing the previous value in // update.oldRaw. we need to read it from our cache. diff --git a/pkg/kv/kvserver/liveness/storage.go b/pkg/kv/kvserver/liveness/storage.go index 86fc9fe7feef..747c4c8eed98 100644 --- a/pkg/kv/kvserver/liveness/storage.go +++ b/pkg/kv/kvserver/liveness/storage.go @@ -30,9 +30,9 @@ type storageImpl struct { db *kv.DB } -// livenessUpdate contains the information for CPutting a new version of a +// LivenessUpdate contains the information for CPutting a new version of a // liveness record. It has both the new and the old version of the proto. -type livenessUpdate struct { +type LivenessUpdate struct { newLiveness livenesspb.Liveness oldLiveness livenesspb.Liveness // oldRaw is the raw value from which `old` was decoded. Used for CPuts as the @@ -71,12 +71,12 @@ func (ls *storageImpl) Get(ctx context.Context, nodeID roachpb.NodeID) (Record, } // Update will attempt to update the liveness record using a CPut with the -// oldRaw from the livenessUpdate. If the oldRaw does not match, the +// oldRaw from the LivenessUpdate. If the oldRaw does not match, the // handleCondFailed func is called with the current data stored for this node. // This method does not retry, but normally the caller will retry using the // returned value on a condition failure. func (ls *storageImpl) Update( - ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, + ctx context.Context, update LivenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { var v *roachpb.Value if err := ls.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { From ef8410e5d6b07fb1a3e64b76cbcece68adf49f5a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 11:20:26 +0200 Subject: [PATCH 19/28] liveness: half-adopt Storage We still want a Storage to be passed into NewNodeLiveness as opposed to a `*kv.DB`, but so far, so good. --- pkg/kv/kvserver/liveness/liveness.go | 4 ++-- pkg/kv/kvserver/liveness/storage.go | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 3416cf51192c..1363bb2243b7 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -262,7 +262,7 @@ type NodeLiveness struct { ambientCtx log.AmbientContext stopper *stop.Stopper clock *hlc.Clock - storage storageImpl + storage Storage livenessThreshold time.Duration cache *cache renewalDuration time.Duration @@ -342,7 +342,7 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { ambientCtx: opts.AmbientCtx, stopper: opts.Stopper, clock: opts.Clock, - storage: storageImpl{db: opts.DB}, + storage: &storageImpl{db: opts.DB}, livenessThreshold: opts.LivenessThreshold, renewalDuration: opts.RenewalDuration, selfSem: make(chan struct{}, 1), diff --git a/pkg/kv/kvserver/liveness/storage.go b/pkg/kv/kvserver/liveness/storage.go index 747c4c8eed98..3485dcfa3d38 100644 --- a/pkg/kv/kvserver/liveness/storage.go +++ b/pkg/kv/kvserver/liveness/storage.go @@ -24,12 +24,24 @@ import ( "github.com/cockroachdb/errors" ) -// storageImpl is the subset of liveness that deals with reading and writing the -// liveness records to kv. All calls to modify liveness are centrialized here. +// Storage is the subset of liveness that deals with reading and writing the +// liveness records to kv. All calls to modify liveness are centralized here. +type Storage interface { + Get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) + Update( + ctx context.Context, update LivenessUpdate, handleCondFailed func(actual Record) error, + ) (Record, error) + Create(ctx context.Context, nodeID roachpb.NodeID) error + Scan(ctx context.Context) ([]Record, error) +} + +// storageImpl implements Storage by storing entries in the replicated KV liveness range. type storageImpl struct { db *kv.DB } +var _ Storage = (*storageImpl)(nil) + // LivenessUpdate contains the information for CPutting a new version of a // liveness record. It has both the new and the old version of the proto. type LivenessUpdate struct { From f865176f8ad4d6c547a96c544420184b54c478d1 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 11:24:21 +0200 Subject: [PATCH 20/28] liveness: finish adopting Storage Now Liveness is constructed using a `Storage` as opposed to a `*kv.DB`. --- pkg/kv/kvserver/liveness/liveness.go | 4 ++-- pkg/kv/kvserver/liveness/storage.go | 5 +++++ pkg/server/server.go | 2 +- pkg/testutils/localtestcluster/local_test_cluster.go | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 1363bb2243b7..f158f8e56675 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -318,7 +318,7 @@ type NodeLivenessOptions struct { Settings *cluster.Settings Gossip Gossip Clock *hlc.Clock - DB *kv.DB + Storage Storage LivenessThreshold time.Duration RenewalDuration time.Duration HistogramWindowInterval time.Duration @@ -342,7 +342,7 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { ambientCtx: opts.AmbientCtx, stopper: opts.Stopper, clock: opts.Clock, - storage: &storageImpl{db: opts.DB}, + storage: opts.Storage, livenessThreshold: opts.LivenessThreshold, renewalDuration: opts.RenewalDuration, selfSem: make(chan struct{}, 1), diff --git a/pkg/kv/kvserver/liveness/storage.go b/pkg/kv/kvserver/liveness/storage.go index 3485dcfa3d38..67249fad72d0 100644 --- a/pkg/kv/kvserver/liveness/storage.go +++ b/pkg/kv/kvserver/liveness/storage.go @@ -42,6 +42,11 @@ type storageImpl struct { var _ Storage = (*storageImpl)(nil) +// NewKVStorage returns a Storage backed by the node liveness range. +func NewKVStorage(db *kv.DB) Storage { + return &storageImpl{db} +} + // LivenessUpdate contains the information for CPutting a new version of a // liveness record. It has both the new and the old version of the proto. type LivenessUpdate struct { diff --git a/pkg/server/server.go b/pkg/server/server.go index a12205befe86..627d9b75770e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -493,7 +493,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { AmbientCtx: cfg.AmbientCtx, Stopper: stopper, Clock: clock, - DB: db, + Storage: liveness.NewKVStorage(db), Gossip: g, LivenessThreshold: nlActive, RenewalDuration: nlRenewal, diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 119c73e52f34..4024308fb481 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -194,7 +194,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto AmbientCtx: cfg.AmbientCtx, Stopper: ltc.stopper, Clock: cfg.Clock, - DB: cfg.DB, + Storage: liveness.NewKVStorage(cfg.DB), Gossip: cfg.Gossip, LivenessThreshold: active, RenewalDuration: renewal, From f7e72f9ae07d8a6ad1417b0edf02bc76adfe674e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 20 Jul 2023 16:00:12 +0200 Subject: [PATCH 21/28] liveness: allow registering callbacks after start I discovered[^1] a deadlock scenario when multiple nodes in the cluster restart with additional stores that need to be bootstrapped. In that case, liveness must be running when the StoreIDs are allocated, but it is not. Trying to address this problem, I realized that when an auxiliary Store is bootstrapped, it will create a new replicateQueue, which will register a new callback into NodeLiveness. But if liveness must be started at this point to fix #106706, we'll run into the assertion that checks that we don't register callbacks on a started node liveness. Something's got to give: we will allow registering callbacks at any given point in time, and they'll get an initial set of notifications synchronously. I audited the few users of RegisterCallback and this seems OK with all of them. [^1]: https://github.com/cockroachdb/cockroach/issues/106706#issuecomment-1640254715 Epic: None Release note: None --- pkg/kv/kvserver/liveness/liveness.go | 59 +++++++++++++++------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index f158f8e56675..7dfc92c866b0 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -280,9 +280,12 @@ type NodeLiveness struct { nodeDialer *nodedialer.Dialer engineSyncs *singleflight.Group - // onIsLive is a callback registered by stores prior to starting liveness. - // It fires when a node transitions from not live to live. - onIsLive []IsLiveCallback // see RegisterCallback + // onIsLiveMu holds callback registered by stores. + // They fire when a node transitions from not live to live. + onIsLiveMu struct { + syncutil.Mutex + callbacks []IsLiveCallback + } // see RegisterCallback // onSelfHeartbeat is invoked after every successful heartbeat // of the local liveness instance's heartbeat loop. @@ -548,15 +551,8 @@ func (nl *NodeLiveness) cacheUpdated(old livenesspb.Liveness, new livenesspb.Liv // Need to use a different signal to determine if liveness changed. now := nl.clock.Now() if !old.IsLive(now) && new.IsLive(now) { - // NB: If we are not started, we don't use the onIsLive callbacks since they - // can still change. This is a bit of a tangled mess since the startup of - // liveness requires the stores to be started, but stores can't start until - // liveness can run. Ideally we could cache all these updates and call - // onIsLive as part of start. - if nl.started.Get() { - for _, fn := range nl.onIsLive { - fn(new) - } + for _, fn := range nl.callbacks() { + fn(new) } } if !old.Membership.Decommissioned() && new.Membership.Decommissioned() && nl.onNodeDecommissioned != nil { @@ -639,15 +635,6 @@ func (nl *NodeLiveness) Start(ctx context.Context) { retryOpts.Closer = nl.stopper.ShouldQuiesce() nl.started.Set(true) - // We may have received some liveness records from Gossip prior to Start being - // called. We need to go through and notify all the callers of them now. - for _, entry := range nl.ScanNodeVitalityFromCache() { - if entry.IsLive(livenesspb.IsAliveNotification) { - for _, fn := range nl.onIsLive { - fn(entry.GetInternalLiveness()) - } - } - } _ = nl.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) { ambient := nl.ambientCtx @@ -746,6 +733,22 @@ func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness livenesspb.Liven return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */) } +func (nl *NodeLiveness) callbacks() []IsLiveCallback { + nl.onIsLiveMu.Lock() + defer nl.onIsLiveMu.Unlock() + return append([]IsLiveCallback(nil), nl.onIsLiveMu.callbacks...) +} + +func (nl *NodeLiveness) notifyIsAliveCallbacks(fns []IsLiveCallback) { + for _, entry := range nl.ScanNodeVitalityFromCache() { + if entry.IsLive(livenesspb.IsAliveNotification) { + for _, fn := range fns { + fn(entry.GetInternalLiveness()) + } + } + } +} + func (nl *NodeLiveness) heartbeatInternal( ctx context.Context, oldLiveness livenesspb.Liveness, incrementEpoch bool, ) (err error) { @@ -1077,13 +1080,15 @@ func (nl *NodeLiveness) Metrics() Metrics { return nl.metrics } -// RegisterCallback registers a callback to be invoked any time a -// node's IsLive() state changes to true. This must be called before Start. +// RegisterCallback registers a callback to be invoked any time a node's +// IsLive() state changes to true. The provided callback will be invoked +// synchronously from RegisterCallback if the node is currently live. func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { - if nl.started.Get() { - log.Fatalf(context.TODO(), "RegisterCallback called after Start") - } - nl.onIsLive = append(nl.onIsLive, cb) + nl.onIsLiveMu.Lock() + nl.onIsLiveMu.callbacks = append(nl.onIsLiveMu.callbacks, cb) + nl.onIsLiveMu.Unlock() + + nl.notifyIsAliveCallbacks([]IsLiveCallback{cb}) } // updateLiveness does a conditional put on the node liveness record for the From 3bf63a6e1abb2d218e78bf5021386161f4b37216 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 10:50:52 +0200 Subject: [PATCH 22/28] liveness: only call onSelfHeartbeat on self I think there was a bug here. This method was previously invoked in `updateLiveness`, but that method is the general workhorse for updating anyone's liveness. In particular, it is called by `IncrementEpoch`. So we were invoking `onSelfHeartbeat` when we would increment other nodes' epochs. This doesn't seem great. Additionally, the code was trying to avoid invoking this callback before liveness was officially "started". Heartbeating yourself before liveness is started is unfortunately a thing due to the tangled start-up initialization sequence; we may see heartbeats triggered by lease requests. Avoid both complications by invoking `onSelfCallback` from the actual main heartbeat loop, whose only job is to heartbeat the own liveness record. I tried to adopt `TestNodeHeartbeatCallback` to give better coverage, but it's a yak shave. A deterministic node liveness (i.e. a way to invoke the main heartbeat loop manually) would make this a lot simpler. I filed an issue to that effect: https://github.com/cockroachdb/cockroach/issues/107452 --- pkg/kv/kvserver/liveness/liveness.go | 5 ++- pkg/kv/kvserver/node_liveness_test.go | 47 +++------------------------ 2 files changed, 7 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 7dfc92c866b0..309bb67ffe84 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -687,6 +687,8 @@ func (nl *NodeLiveness) Start(ctx context.Context) { return nil }); err != nil { log.Warningf(ctx, heartbeatFailureLogFormat, err) + } else if nl.onSelfHeartbeat != nil { + nl.onSelfHeartbeat(ctx) } nl.heartbeatToken <- struct{}{} @@ -1125,9 +1127,6 @@ func (nl *NodeLiveness) updateLiveness( } return Record{}, err } - if nl.started.Get() && nl.onSelfHeartbeat != nil { - nl.onSelfHeartbeat(ctx) - } return written, nil } if err := ctx.Err(); err != nil { diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 66a6ef5c31c2..a4ee4e75cd2d 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -355,63 +355,26 @@ func TestNodeHeartbeatCallback(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - manualClock := hlc.NewHybridManualClock() - expected := manualClock.UnixNano() - tc := testcluster.StartTestCluster(t, 3, + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - WallClock: manualClock, - }, - }, - }, }) defer tc.Stopper().Stop(ctx) // Verify liveness of all nodes for all nodes. verifyLiveness(t, tc) - pauseNodeLivenessHeartbeatLoops(tc) // Verify that last update time has been set for all nodes. - verifyUptimes := func() error { + verifyUptimes := func() { for i := range tc.Servers { s := tc.GetFirstStoreFromServer(t, i) uptm, err := s.ReadLastUpTimestamp(context.Background()) - if err != nil { - return errors.Wrapf(err, "error reading last up time from store %d", i) - } - if a, e := uptm.WallTime, expected; a < e { - return errors.Errorf("store %d last uptime = %d; wanted %d", i, a, e) - } + require.NoError(t, err) + require.NotZero(t, uptm) } - return nil } - if err := verifyUptimes(); err != nil { - t.Fatal(err) - } - - // Advance clock past the liveness threshold and force a manual heartbeat on - // all node liveness objects, which should update the last up time for each - // store. - manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).TestingGetLivenessThreshold().Nanoseconds() + 1) - expected = manualClock.UnixNano() - for _, s := range tc.Servers { - nl := s.NodeLiveness().(*liveness.NodeLiveness) - l, ok := nl.Self() - assert.True(t, ok) - if err := nl.Heartbeat(context.Background(), l); err != nil { - t.Fatal(err) - } - } - // NB: since the heartbeat callback is invoked synchronously in - // `Heartbeat()` which this goroutine invoked, we don't need to wrap this in - // a retry. - if err := verifyUptimes(); err != nil { - t.Fatal(err) - } + verifyUptimes() } // TestNodeLivenessEpochIncrement verifies that incrementing the epoch From 4bf586596c19a703964c9390b3aaff6b01e852fd Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 12:24:05 +0200 Subject: [PATCH 23/28] liveness: move liveness and store regex to globals Helps with testing. --- pkg/kv/kvserver/liveness/cache.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/liveness/cache.go b/pkg/kv/kvserver/liveness/cache.go index b7cce1e1392e..ef68c1203810 100644 --- a/pkg/kv/kvserver/liveness/cache.go +++ b/pkg/kv/kvserver/liveness/cache.go @@ -34,6 +34,9 @@ type Gossip interface { GetNodeID() roachpb.NodeID } +var livenessRegex = gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) +var storeRegex = gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix) + // cache stores updates to both Liveness records and the store descriptor map. // It doesn't store the entire StoreDescriptor, only the time when it is // updated. The StoreDescriptor is sent directly from nodes so doesn't require @@ -77,13 +80,11 @@ func newCache( // nl.Start() is invoked. At the time of writing this invariant does // not hold (which is a problem, since the node itself won't be live // at this point, and requests routed to it will hang). - livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) c.gossip.RegisterCallback(livenessRegex, c.livenessGossipUpdate) // Enable redundant callbacks for the store keys because we use these // callbacks as a clock to determine when a store was last updated even if it // hasn't otherwise changed. - storeRegex := gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix) c.gossip.RegisterCallback(storeRegex, c.storeGossipUpdate, gossip.Redundant) } return &c From d767731f6f082a4bef4387d5cf13e53aa54e3561 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 24 Jul 2023 12:24:42 +0200 Subject: [PATCH 24/28] liveness: add test for IsLiveCallback invocation This tests that regardless of when a callback is registered, it gets called. --- pkg/kv/kvserver/liveness/BUILD.bazel | 2 + pkg/kv/kvserver/liveness/liveness_test.go | 180 ++++++++++++++++++++++ 2 files changed, 182 insertions(+) diff --git a/pkg/kv/kvserver/liveness/BUILD.bazel b/pkg/kv/kvserver/liveness/BUILD.bazel index 161ca9fc8050..4f2708765882 100644 --- a/pkg/kv/kvserver/liveness/BUILD.bazel +++ b/pkg/kv/kvserver/liveness/BUILD.bazel @@ -51,6 +51,7 @@ go_test( embed = [":liveness"], deps = [ "//pkg/base", + "//pkg/gossip", "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/plan", "//pkg/kv/kvserver/liveness/livenesspb", @@ -69,6 +70,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/liveness/liveness_test.go b/pkg/kv/kvserver/liveness/liveness_test.go index e553f76ddab2..073e8716686f 100644 --- a/pkg/kv/kvserver/liveness/liveness_test.go +++ b/pkg/kv/kvserver/liveness/liveness_test.go @@ -11,16 +11,22 @@ package liveness import ( + "context" "fmt" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -386,3 +392,177 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { }) } } + +type mockGossip struct { + m map[string]gossip.Callback +} + +func (g *mockGossip) RegisterCallback( + pattern string, method gossip.Callback, opts ...gossip.CallbackOption, +) func() { + if g.m == nil { + g.m = map[string]gossip.Callback{} + } + g.m[pattern] = method + return func() { + delete(g.m, pattern) + } +} + +func (g *mockGossip) GetNodeID() roachpb.NodeID { + return 1 +} + +func mockRecord(nodeID roachpb.NodeID) Record { + return Record{Liveness: livenesspb.Liveness{NodeID: nodeID}} +} + +type mockStorage struct{} + +func (s *mockStorage) Get(ctx context.Context, nodeID roachpb.NodeID) (Record, error) { + return mockRecord(nodeID), nil +} + +func (s *mockStorage) Update( + ctx context.Context, update LivenessUpdate, handleCondFailed func(actual Record) error, +) (Record, error) { + return Record{Liveness: update.newLiveness}, nil +} + +func (s *mockStorage) Create(ctx context.Context, nodeID roachpb.NodeID) error { + return nil +} + +func (s *mockStorage) Scan(ctx context.Context) ([]Record, error) { + return []Record{mockRecord(2), mockRecord(3)}, nil +} + +func TestNodeLivenessIsLiveCallback(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + st := cluster.MakeTestingClusterSettings() + mc := timeutil.NewManualTime(timeutil.Unix(1, 0)) + g := &mockGossip{} + s := &mockStorage{} + nl := NewNodeLiveness(NodeLivenessOptions{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Stopper: stopper, + Settings: st, + Gossip: g, + Clock: hlc.NewClockForTesting(mc), + Storage: s, + LivenessThreshold: 24 * time.Hour, + RenewalDuration: 23 * time.Hour, + HistogramWindowInterval: base.DefaultHistogramWindowInterval(), + }) + + require.Len(t, g.m, 2) + + update := func(nodeID roachpb.NodeID) { + fn := g.m[livenessRegex] + k := gossip.MakeNodeLivenessKey(nodeID) + exp := nl.clock.Now().ToLegacyTimestamp() + exp.WallTime++ // stays valid until we bump `mc` + lv := livenesspb.Liveness{ + NodeID: nodeID, + Epoch: 1, + Expiration: exp, + Membership: livenesspb.MembershipStatus_ACTIVE, + } + var v roachpb.Value + require.NoError(t, v.SetProto(&lv)) + fn(k, v) + } + + m := struct { + syncutil.Mutex + m map[string]int + }{m: map[string]int{}} + + getMap := func() map[string]int { + m.Lock() + defer m.Unlock() + mm := map[string]int{} + for k, v := range m.m { + mm[k] = v + } + return mm + } + + const ( + preStartPreGossip = "pre-start-pre-gossip" + preStartPostGossip = "pre-start-post-gossip" + postStartPreGossip = "post-start-pre-gossip" + postStartPostGossip = "post-start-post-gossip" + ) + + reg := func(name string) { + nl.RegisterCallback(func(liveness livenesspb.Liveness) { + if liveness.NodeID == 1 { + // Ignore calls that come from the liveness heartbeating itself, + // since these are on a goroutine and we just want a deterministic + // test here. + return + } + m.Lock() + defer m.Unlock() + m.m[name]++ + }) + } + + reg(preStartPreGossip) + // n2 becomes live right after (n1's) Liveness is instantiated, before it's started. + // This should invoke the callback that is already registered. There is no concurrency + // yet so we don't have to lock `m`. + update(2) + require.Equal(t, map[string]int{ + preStartPreGossip: 1, + }, getMap()) + + reg(preStartPostGossip) + nl.Start(ctx) + reg(postStartPreGossip) + update(3) + reg(postStartPostGossip) + + // Each callback gets called twice (once for n2 once for n3), regardless of when + // it was registered. + require.Equal(t, map[string]int{ + preStartPreGossip: 2, + preStartPostGossip: 2, + postStartPreGossip: 2, + postStartPostGossip: 2, + }, getMap()) + + // Additional gossip updates don't trigger more callbacks unless node + // is non-live in the meantime. + update(2) + update(3) + require.Equal(t, map[string]int{ + preStartPreGossip: 2, + preStartPostGossip: 2, + postStartPreGossip: 2, + postStartPostGossip: 2, + }, getMap()) + + mc.Advance(time.Second) // n2 and n3 are now non-live + + require.Equal(t, map[string]int{ + preStartPreGossip: 2, + preStartPostGossip: 2, + postStartPreGossip: 2, + postStartPostGossip: 2, + }, getMap()) + + // Heartbeat from 3 triggers callbacks because old liveness + // is now non-live whereas the new one is. + update(3) + require.Equal(t, map[string]int{ + preStartPreGossip: 3, + preStartPostGossip: 3, + postStartPreGossip: 3, + postStartPostGossip: 3, + }, getMap()) +} From 608c9491b5f3d9ec52380a6f851758b026fab54f Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 25 Jul 2023 14:18:26 +0200 Subject: [PATCH 25/28] kvserver: fail gracefully in TestLeaseTransferRejectedIfTargetNeedsSnapshot We saw this test hang in CI. What likely happened (according to the stacks) is that a lease transfer that was supposed to be caught by an interceptor never showed up in the interceptor. The most likely explanation is that it errored out before it got to evaluation. It then signaled a channel the test was only prepared to check later, so the test hung (waiting for a channel that was now never to be touched). This test is hard to maintain. It would be great (though, for now, out of reach) to write tests like it in a deterministic framework[^1] [^1]: see https://github.com/cockroachdb/cockroach/issues/105177. For now, fix the test so that when the (so far unknown) error rears its head again, it will fail properly, so we get to see the error and can take another pass at fixing the test (separately). Stressing this commit[^2], we get: > transferErrC unexpectedly signaled: /Table/Max: transfer lease unexpected > error: refusing to transfer lease to (n3,s3):3 because target may need a Raft > snapshot: replica in StateProbe This makes sense. The test wants to exercise the below-raft mechanism, but the above-raft mechanism also exists and while we didn't want to interact with it, we sometimes do[^1] [^1]: somewhat related to https://github.com/cockroachdb/cockroach/issues/107524 [^2]: `./dev test --filter TestLeaseTransferRejectedIfTargetNeedsSnapshot --stress ./pkg/kv/kvserver/` on gceworker, 285s Touches #106383. Epic: None Release note: None --- pkg/kv/kvserver/client_replica_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 4737cd86676f..d1d27272d8da 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3003,10 +3003,17 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) { // Replica.AdminTransferLease. transferErrC := make(chan error, 1) if rejectAfterRevoke { - _ = tc.Stopper().RunAsyncTask(ctx, "transfer lease", func(ctx context.Context) { + require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "transfer lease", func(ctx context.Context) { transferErrC <- tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)) - }) - <-transferLeaseReqBlockedC + })) + select { + case <-transferLeaseReqBlockedC: + // Expected case: lease transfer triggered our interceptor and is now + // waiting there for transferLeaseReqUnblockedCh. + case err := <-transferErrC: + // Unexpected case: lease transfer errored out before making it into the filter. + t.Fatalf("transferErrC unexpectedly signaled: %v", err) + } } // Truncate the log at index+1 (log entries < N are removed, so this From 1c8c503d69fd0d1bd70ee6040acfc3e5fec840e7 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 25 Jul 2023 14:45:48 +0200 Subject: [PATCH 26/28] kvserver: deflake TestLeaseTransferRejectedIfTargetNeedsSnapshot See previous commit. We sometimes hit the above-raft check when we wanted to hit only the below-raft one. This commit fixes this by selectively disabling the above-raft check in this test. Note that not all tests using lease transfers are susceptible to this problem in the way that this test is. This is because this test also lowers the `LeaseTransferRejectedRetryLoopCount`[^1] because it is intentionally manufacturing failed lease transfers and doesn't want to sit out the retry loop. It is that time-saving optimization that also allows the spurious error to bubble up. [^1]: https://github.com/cockroachdb/cockroach/blob/66c9f93ae86bddd7ba4c5f6a6b8b6cb700ca23ce/pkg/kv/kvserver/testing_knobs.go#L375 Epic: none Release note: None --- pkg/kv/kvserver/client_replica_test.go | 3 +++ pkg/kv/kvserver/replica_range_lease.go | 2 +- pkg/kv/kvserver/testing_knobs.go | 4 ++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index d1d27272d8da..befceeb18488 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2933,6 +2933,9 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ + // If we're testing the below-raft check, disable the above-raft check. + // See: https://github.com/cockroachdb/cockroach/pull/107526 + DisableAboveRaftLeaseTransferSafetyChecks: rejectAfterRevoke, TestingRequestFilter: func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error { if rejectAfterRevoke && ba.IsSingleTransferLeaseRequest() { transferLeaseReqBlockOnce.Do(func() { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index bd98eca5b655..b3615eac3244 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -949,7 +949,7 @@ func (r *Replica) AdminTransferLease( raftStatus := r.raftStatusRLocked() raftFirstIndex := r.raftFirstIndexRLocked() snapStatus := raftutil.ReplicaMayNeedSnapshot(raftStatus, raftFirstIndex, nextLeaseHolder.ReplicaID) - if snapStatus != raftutil.NoSnapshotNeeded && !bypassSafetyChecks { + if snapStatus != raftutil.NoSnapshotNeeded && !bypassSafetyChecks && !r.store.cfg.TestingKnobs.DisableAboveRaftLeaseTransferSafetyChecks { r.store.metrics.LeaseTransferErrorCount.Inc(1) log.VEventf(ctx, 2, "not initiating lease transfer because the target %s may "+ "need a snapshot: %s", nextLeaseHolder, snapStatus) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index d8462940073b..7cb72b992956 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -225,6 +225,10 @@ type StoreTestingKnobs struct { // leadership when it diverges from the range's leaseholder. This can // also be set via COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER. DisableLeaderFollowsLeaseholder bool + // If set, the above-raft lease transfer safety checks (that verify that + // we don't transfer leases to followers that need a snapshot, etc) are + // disabled. The proposal-time checks are not affected by this knob. + DisableAboveRaftLeaseTransferSafetyChecks bool // DisableRefreshReasonNewLeader disables refreshing pending commands when a new // leader is discovered. DisableRefreshReasonNewLeader bool From 2c72155b64ed7da6661b316fa22efc5051ede96b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 25 Jul 2023 16:37:46 +0200 Subject: [PATCH 27/28] kvserver: disable replicate queue and lease transfers in closedts tests For a more holistic suggestion on how to fix this for the likely many other tests susceptible to similar issues, see: https://github.com/cockroachdb/cockroach/issues/107528 Fixes #101824. Release note: None Epic: none --- pkg/kv/kvserver/closed_timestamp_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index bb59a791b8db..8bebfbbb082b 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -1206,9 +1206,15 @@ func setupClusterForClosedTSTesting( SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'; SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '%s'; SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; +SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off'; `, targetDuration, targetDuration/4), ";")...) + // Disable replicate queues to avoid errant lease transfers. + // + // See: https://github.com/cockroachdb/cockroach/issues/101824. + tc.ToggleReplicateQueues(false) + return tc, tc.ServerConn(0), desc } From c4354725aeb1ab24d22a32b5037fd098a3f783d5 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 23 Jul 2023 14:29:06 +0000 Subject: [PATCH 28/28] kvserver: scale Raft entry cache size with system memory The Raft entry cache size defaulted to 16 MB, which is rather small. This has been seen to cause tail latency and throughput degradation with high write volume on large nodes, correlating with a reduction in the entry cache hit rate. This patch linearly scales the Raft entry cache size as 1/256 of total system/cgroup memory, shared evenly between all stores, with a minimum 32 MB. For example, a 32 GB 8-vCPU node will have a 128 MB entry cache. This is a conservative default, since this memory is not accounted for in existing memory budgets nor by the `--cache` flag. We rarely see cache misses in production clusters anyway, and have seen significantly improved hit rates with this scaling (e.g. a 64 KB kv0 workload on 8-vCPU nodes increased from 87% to 99% hit rate). Epic: none Release note (performance improvement): The default Raft entry cache size has been increased from 16 MB to 1/256 of system memory with a minimum of 32 MB, divided evenly between all stores. This can be configured via `COCKROACH_RAFT_ENTRY_CACHE_SIZE`. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/store.go | 37 ++++++++++++++++++++++++++++++------- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 4f2f2ec0c460..c4205dd82420 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -177,6 +177,7 @@ go_library( "//pkg/rpc", "//pkg/rpc/nodedialer", "//pkg/security/username", + "//pkg/server/status", "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index d08dbc8ef370..15ba1e15d058 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -135,13 +136,32 @@ var defaultRaftSchedulerConcurrency = envutil.EnvOrDefaultInt( // counts, while also avoiding starvation by excessive sharding. var defaultRaftSchedulerShardSize = envutil.EnvOrDefaultInt("COCKROACH_SCHEDULER_SHARD_SIZE", 16) -// defaultRaftEntryCacheSize is the default size in bytes for a store's Raft -// entry cache. The Raft entry cache is shared by all Raft groups managed by the -// store. It is used to cache uncommitted raft log entries such that once those -// entries are committed, their application can avoid disk reads to retrieve -// them from the persistent log. -var defaultRaftEntryCacheSize = envutil.EnvOrDefaultBytes( - "COCKROACH_RAFT_ENTRY_CACHE_SIZE", 16<<20 /* 16 MiB */) +// defaultRaftEntryCacheSize is the default size in bytes for the Raft entry +// cache, divided evenly between stores. The Raft entry cache is shared by all +// Raft groups managed by each store. It is used to cache uncommitted raft log +// entries such that once those entries are committed, their application can +// avoid disk reads to retrieve them from the persistent log. +// +// It defaults to 1/256 of system memory, with minimum 32 MB, e.g.: +// +// 8 GB RAM = 32 MB (~2 vCPUs) +// 16 GB RAM = 64 MB (~4 vCPUs) +// 32 GB RAM = 128 MB (~8 vCPUs) +// 64 GB RAM = 256 MB (~16 vCPUs) +// +// This is conservative, since the memory is not accounted for in memory budgets +// nor via the --cache flag. However, it should be sufficient to achieve near +// 100% cache hit rate for well-provisioned low-latency clusters with moderate +// write volume. See: https://github.com/cockroachdb/cockroach/issues/98666 +var defaultRaftEntryCacheSize = func() int64 { + var cacheSize int64 = 32 << 20 // 32 MiB + if mem, _, err := status.GetTotalMemoryWithoutLogging(); err == nil { + if s := mem / 256; s > cacheSize { + cacheSize = s + } + } + return envutil.EnvOrDefaultBytes("COCKROACH_RAFT_ENTRY_CACHE_SIZE", cacheSize) +}() // defaultRaftSchedulerPriorityShardSize specifies the default size of the Raft // scheduler priority shard, used for certain system ranges. This shard is @@ -1272,6 +1292,9 @@ func (sc *StoreConfig) SetDefaults(numStores int) { } if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = uint64(defaultRaftEntryCacheSize) + if numStores > 1 { // guard against zero division + sc.RaftEntryCacheSize /= uint64(numStores) + } } if raftDisableLeaderFollowsLeaseholder { sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true