diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_request.go b/pkg/kv/kvserver/batcheval/cmd_lease_request.go index 717530470b27..cf47e6fd6d80 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_request.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_request.go @@ -12,6 +12,7 @@ package batcheval import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -64,6 +65,15 @@ func RequestLease( Requested: args.Lease, } + // However, we verify that the current lease's sequence number and proposed + // timestamp match the provided PrevLease. This ensures that the validation + // here is consistent with the validation that was performed when the lease + // request was constructed. + if prevLease.Sequence != args.PrevLease.Sequence || !prevLease.ProposedTS.Equal(args.PrevLease.ProposedTS) { + rErr.Message = fmt.Sprintf("expected previous lease %s, found %s", args.PrevLease, prevLease) + return newFailedLeaseTrigger(false /* isTransfer */), rErr + } + // MIGRATION(tschottdorf): needed to apply Raft commands which got proposed // before the StartStasis field was introduced. newLease := args.Lease diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 29adb1e2c53a..0b987fa8e7ca 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1681,3 +1682,88 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) { require.Greater(t, liveness.Epoch, prevLease.Epoch) }) } + +// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a +// promotion from an expiration-based lease to an epoch-based lease does not +// permit the expiration time of the lease to regress. This is enforced by +// detecting cases where the leaseholder's liveness record's expiration trails +// its expiration-based lease's expiration and synchronously heartbeating the +// leaseholder's liveness record before promoting the lease. +func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Create scratch range. + key := tc.ScratchRange(t) + desc := tc.LookupRangeOrFatal(t, key) + + // Pause n1's node liveness heartbeats, to allow its liveness expiration to + // fall behind. + l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness) + l0.PauseHeartbeatLoopForTest() + l, ok := l0.GetLiveness(tc.Server(0).NodeID()) + require.True(t, ok) + + // Make sure n1 has an expiration-based lease. + s0 := tc.GetFirstStoreFromServer(t, 0) + repl := s0.LookupReplica(desc.StartKey) + require.NotNil(t, repl) + expLease := repl.CurrentLeaseStatus(ctx) + require.True(t, expLease.IsValid()) + require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type()) + + // Wait for the expiration-based lease to have a later expiration than the + // expiration timestamp in n1's liveness record. + testutils.SucceedsSoon(t, func() error { + expLease = repl.CurrentLeaseStatus(ctx) + if expLease.Expiration().Less(l.Expiration.ToTimestamp()) { + return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l) + } + return nil + }) + + // Enable epoch-based leases. This will cause automatic lease renewal to try + // to promote the expiration-based lease to an epoch-based lease. + // + // Since we have disabled the background node liveness heartbeat loop, it is + // critical that this lease promotion synchronously heartbeats node liveness + // before acquiring the epoch-based lease. + kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false) + + // Wait for that lease promotion to occur. + var epochLease kvserverpb.LeaseStatus + testutils.SucceedsSoon(t, func() error { + // Read from the range to prompt it to try to upgrade the lease. + _, pErr := kv.SendWrapped(ctx, s0.TestSender(), getArgs(key)) + require.Nil(t, pErr) + + epochLease = repl.CurrentLeaseStatus(ctx) + if epochLease.Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease %v not upgraded to epoch-based", epochLease) + } + return nil + }) + + // Once the lease has been promoted to an epoch-based lease, the effective + // expiration (maintained indirectly in the liveness record) must be greater + // than that in the preceding expiration-based lease. If this were to regress, + // a non-cooperative lease failover to a third lease held by a different node + // could overlap in MVCC time with the first lease (i.e. its start time could + // precede expLease.Expiration), violating the lease disjointness property. + // + // If we disable the `expToEpochPromo` branch in replica_range_lease.go, this + // assertion fails. + require.True(t, expLease.Expiration().Less(epochLease.Expiration())) +} diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index f583066e7554..2cf45dedab1b 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2820,6 +2820,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) { ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Settings: st, + RaftConfig: base.RaftConfig{ + // We plan to increment the manual clock by MinStatsDuration a few + // times below and would like for leases to not expire. Configure a + // longer lease duration to achieve this. + RangeLeaseDuration: 10 * replicastats.MinStatsDuration, + }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ WallClock: manualClock, @@ -2837,8 +2843,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) { key := tc.ScratchRange(t) desc := tc.AddVotersOrFatal(t, key, tc.Target(1)) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) - - tc.IncrClockForLeaseUpgrade(t, manualClock) tc.WaitForLeaseUpgrade(ctx, t, desc) cap, err := s.Capacity(ctx, false /* useCached */) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index dfed63081aea..f6c30084aecb 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -219,7 +220,6 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, startKey roachpb.Key, - transfer bool, bypassSafetyChecks bool, ) *leaseRequestHandle { if nextLease, ok := p.RequestPending(); ok { @@ -237,10 +237,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( nextLeaseHolder.ReplicaID, nextLease.Replica.ReplicaID)) } - acquisition := !status.Lease.OwnedBy(p.repl.store.StoreID()) - extension := !transfer && !acquisition + // Who owns the previous and next lease? + prevLocal := status.Lease.OwnedBy(p.repl.store.StoreID()) + nextLocal := nextLeaseHolder.StoreID == p.repl.store.StoreID() + + // Assert that the lease acquisition, extension, or transfer is valid. + acquisition := !prevLocal && nextLocal + extension := prevLocal && nextLocal + transfer := prevLocal && !nextLocal + remote := !prevLocal && !nextLocal _ = extension // not used, just documentation + if remote { + log.Fatalf(ctx, "cannot acquire/extend lease for remote replica: %v -> %v", status, nextLeaseHolder) + } if acquisition { // If this is a non-cooperative lease change (i.e. an acquisition), it // is up to us to ensure that Lease.Start is greater than the end time @@ -279,6 +289,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( ProposedTS: &status.Now, } + var reqLeaseLiveness livenesspb.Liveness if p.repl.shouldUseExpirationLeaseRLocked() || (transfer && TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV) && @@ -310,6 +321,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( return llHandle } reqLease.Epoch = l.Epoch + reqLeaseLiveness = l.Liveness } var leaseReq kvpb.Request @@ -340,7 +352,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( } } - if err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq); err != nil { + if err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq); err != nil { // We failed to start the asynchronous task. Send a blank NotLeaseHolderError // back to indicate that we have no idea who the range lease holder might // be; we've withdrawn from active duty. @@ -364,10 +376,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( // // The status argument is used as the expected value for liveness operations. // leaseReq must be consistent with the LeaseStatus. +// +// The reqLeaseLiveness argument is provided when reqLease is an epoch-based +// lease. func (p *pendingLeaseRequest) requestLeaseAsync( parentCtx context.Context, - nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, + reqLease roachpb.Lease, + reqLeaseLiveness livenesspb.Liveness, leaseReq kvpb.Request, ) error { // Create a new context. We run the request to completion even if all callers @@ -394,7 +410,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( func(ctx context.Context) { defer sp.Finish() - err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq) + err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -429,13 +445,19 @@ func (p *pendingLeaseRequest) requestLeaseAsync( return nil } +var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second) + // requestLease sends a synchronous transfer lease or lease request to the // specified replica. It is only meant to be called from requestLeaseAsync, // since it does not coordinate with other in-flight lease requests. +// +// The reqLeaseLiveness argument is provided when reqLease is an epoch-based +// lease. func (p *pendingLeaseRequest) requestLease( ctx context.Context, - nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, + reqLease roachpb.Lease, + reqLeaseLiveness livenesspb.Liveness, leaseReq kvpb.Request, ) error { started := timeutil.Now() @@ -443,15 +465,43 @@ func (p *pendingLeaseRequest) requestLease( p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds()) }() + nextLeaseHolder := reqLease.Replica + extension := status.OwnedBy(nextLeaseHolder.StoreID) + + // If we are promoting an expiration-based lease to an epoch-based lease, we + // must make sure the expiration does not regress. We do this here because the + // expiration is stored directly in the lease for expiration-based leases but + // indirectly in liveness record for epoch-based leases. To ensure this, we + // manually heartbeat our liveness record if necessary. This is expected to + // work because the liveness record interval and the expiration-based lease + // interval are the same. + expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch + if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness) + if err != nil { + if logFailedHeartbeatOwnLiveness.ShouldLog() { + log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) + } + return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), + fmt.Sprintf("failed to manipulate liveness record: %s", err)) + } + // Assert that the liveness record expiration is now greater than the + // expiration of the lease we're promoting. + l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID) + if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+ + "expiration of the previous lease %s after liveness heartbeat", l, status.Lease) + } + } + // If we're replacing an expired epoch-based lease, we must increment the // epoch of the prior owner to invalidate its leases. If we were the owner, // then we instead heartbeat to become live. if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED { var err error // If this replica is previous & next lease holder, manually heartbeat to become live. - if status.OwnedBy(nextLeaseHolder.StoreID) && - p.repl.store.StoreID() == nextLeaseHolder.StoreID { - if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil { + if extension { + if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() { log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) } } else if status.Liveness.Epoch == status.Lease.Epoch { @@ -844,7 +894,7 @@ func (r *Replica) requestLeaseLocked( } return r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(), - false /* transfer */, false /* bypassSafetyChecks */) + false /* bypassSafetyChecks */) } // AdminTransferLease transfers the LeaderLease to another replica. Only the @@ -938,7 +988,7 @@ func (r *Replica) AdminTransferLease( } transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( - ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true /* transfer */, bypassSafetyChecks, + ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), bypassSafetyChecks, ) return nil, transfer, nil } @@ -1486,17 +1536,24 @@ func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb // maybeSwitchLeaseType will synchronously renew a lease using the appropriate // type if it is (or was) owned by this replica and has an incorrect type. This // typically happens when changing kv.expiration_leases_only.enabled. -func (r *Replica) maybeSwitchLeaseType(ctx context.Context, st kvserverpb.LeaseStatus) *kvpb.Error { - if !st.OwnedBy(r.store.StoreID()) { - return nil - } +func (r *Replica) maybeSwitchLeaseType(ctx context.Context) *kvpb.Error { + llHandle := func() *leaseRequestHandle { + now := r.store.Clock().NowAsClockTimestamp() + // The lease status needs to be checked and requested under the same lock, + // to avoid an interleaving lease request changing the lease between the + // two. + r.mu.Lock() + defer r.mu.Unlock() - var llHandle *leaseRequestHandle - r.mu.Lock() - if !r.hasCorrectLeaseTypeRLocked(st.Lease) { - llHandle = r.requestLeaseLocked(ctx, st) - } - r.mu.Unlock() + st := r.leaseStatusAtRLocked(ctx, now) + if !st.OwnedBy(r.store.StoreID()) { + return nil + } + if r.hasCorrectLeaseTypeRLocked(st.Lease) { + return nil + } + return r.requestLeaseLocked(ctx, st) + }() if llHandle != nil { select { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 030b4d25496f..d17dbc8b1dbf 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1374,13 +1374,24 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { // Expire the lease. Given that the Raft leadership is on n2, only n2 will be // eligible to acquire a new lease. log.Infof(ctx, "test expiring lease") - nl := n2.NodeLiveness().(*liveness.NodeLiveness) - resumeHeartbeats := nl.PauseAllHeartbeatsForTest() - n2Liveness, ok := nl.Self() + nl2 := n2.NodeLiveness().(*liveness.NodeLiveness) + resumeHeartbeats := nl2.PauseAllHeartbeatsForTest() + n2Liveness, ok := nl2.Self() require.True(t, ok) manualClock.Increment(n2Liveness.Expiration.ToTimestamp().Add(1, 0).WallTime - manualClock.UnixNano()) atomic.StoreInt64(&rejectExtraneousRequests, 1) - // Ask another node to increment n2's liveness record. + + // Ask another node to increment n2's liveness record, but first, wait until + // n1's liveness state is the same as n2's. Otherwise, the epoch below might + // get rejected because of mismatching liveness records. + testutils.SucceedsSoon(t, func() error { + nl1 := n1.NodeLiveness().(*liveness.NodeLiveness) + n2LivenessFromN1, _ := nl1.GetLiveness(n2.NodeID()) + if n2Liveness != n2LivenessFromN1.Liveness { + return errors.Errorf("waiting for node 2 liveness to converge on both nodes 1 and 2") + } + return nil + }) require.NoError(t, n1.NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch(ctx, n2Liveness)) resumeHeartbeats() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 705ac5b48d60..df1c63987777 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -955,7 +955,8 @@ func TestReplicaLease(t *testing.T) { ctx, tc.repl, allSpans(), false, /* requiresClosedTSOlderThanStorageSnap */ ), Args: &kvpb.RequestLeaseRequest{ - Lease: lease, + Lease: lease, + PrevLease: tc.repl.CurrentLeaseStatus(ctx).Lease, }, }, &kvpb.RequestLeaseResponse{}); !testutils.IsError(err, "replica not found") { t.Fatalf("unexpected error: %+v", err) @@ -1289,7 +1290,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { st := tc.repl.CurrentLeaseStatus(ctx) ba := &kvpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() - ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease}) + ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease, PrevLease: st.Lease}) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) ch, _, _, _, pErr := tc.repl.evalAndPropose(ctx, ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 4ba2bf01d968..19082d7f00bf 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1055,11 +1055,11 @@ func (rq *replicateQueue) PlanOneChange( // TODO(erikgrinaker): We shouldn't overload the replicate queue to also be // responsible for lease maintenance, but it'll do for now. See: // https://github.com/cockroachdb/cockroach/issues/98433 - leaseStatus, pErr := repl.redirectOnOrAcquireLease(ctx) + _, pErr := repl.redirectOnOrAcquireLease(ctx) if pErr != nil { return change, pErr.GoError() } - pErr = repl.maybeSwitchLeaseType(ctx, leaseStatus) + pErr = repl.maybeSwitchLeaseType(ctx) if pErr != nil { return change, pErr.GoError() } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 2b7ef21b61be..e02e892252cb 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1070,15 +1070,6 @@ func (tc *TestCluster) TransferRangeLeaseOrFatal( } } -// IncrClockForLeaseUpgrade run up the clock to force a lease renewal (and thus -// the change in lease types). -func (tc *TestCluster) IncrClockForLeaseUpgrade(t *testing.T, clock *hlc.HybridManualClock) { - clock.Increment( - tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() + - time.Second.Nanoseconds(), - ) -} - // MaybeWaitForLeaseUpgrade waits until the lease held for the given range // descriptor is upgraded to an epoch-based one, but only if we expect the lease // to be upgraded.