Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: kv: prevent lease interval regression during expiration-to-epoch promotion #130124

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -64,6 +65,15 @@ func RequestLease(
Requested: args.Lease,
}

// However, we verify that the current lease's sequence number and proposed
// timestamp match the provided PrevLease. This ensures that the validation
// here is consistent with the validation that was performed when the lease
// request was constructed.
if prevLease.Sequence != args.PrevLease.Sequence || !prevLease.ProposedTS.Equal(args.PrevLease.ProposedTS) {
rErr.Message = fmt.Sprintf("expected previous lease %s, found %s", args.PrevLease, prevLease)
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}

// MIGRATION(tschottdorf): needed to apply Raft commands which got proposed
// before the StartStasis field was introduced.
newLease := args.Lease
Expand Down
86 changes: 86 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1681,3 +1682,88 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) {
require.Greater(t, liveness.Epoch, prevLease.Epoch)
})
}

// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a
// promotion from an expiration-based lease to an epoch-based lease does not
// permit the expiration time of the lease to regress. This is enforced by
// detecting cases where the leaseholder's liveness record's expiration trails
// its expiration-based lease's expiration and synchronously heartbeating the
// leaseholder's liveness record before promoting the lease.
func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
},
})
defer tc.Stopper().Stop(ctx)

// Create scratch range.
key := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, key)

// Pause n1's node liveness heartbeats, to allow its liveness expiration to
// fall behind.
l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness)
l0.PauseHeartbeatLoopForTest()
l, ok := l0.GetLiveness(tc.Server(0).NodeID())
require.True(t, ok)

// Make sure n1 has an expiration-based lease.
s0 := tc.GetFirstStoreFromServer(t, 0)
repl := s0.LookupReplica(desc.StartKey)
require.NotNil(t, repl)
expLease := repl.CurrentLeaseStatus(ctx)
require.True(t, expLease.IsValid())
require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type())

// Wait for the expiration-based lease to have a later expiration than the
// expiration timestamp in n1's liveness record.
testutils.SucceedsSoon(t, func() error {
expLease = repl.CurrentLeaseStatus(ctx)
if expLease.Expiration().Less(l.Expiration.ToTimestamp()) {
return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l)
}
return nil
})

// Enable epoch-based leases. This will cause automatic lease renewal to try
// to promote the expiration-based lease to an epoch-based lease.
//
// Since we have disabled the background node liveness heartbeat loop, it is
// critical that this lease promotion synchronously heartbeats node liveness
// before acquiring the epoch-based lease.
kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false)

// Wait for that lease promotion to occur.
var epochLease kvserverpb.LeaseStatus
testutils.SucceedsSoon(t, func() error {
// Read from the range to prompt it to try to upgrade the lease.
_, pErr := kv.SendWrapped(ctx, s0.TestSender(), getArgs(key))
require.Nil(t, pErr)

epochLease = repl.CurrentLeaseStatus(ctx)
if epochLease.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease %v not upgraded to epoch-based", epochLease)
}
return nil
})

// Once the lease has been promoted to an epoch-based lease, the effective
// expiration (maintained indirectly in the liveness record) must be greater
// than that in the preceding expiration-based lease. If this were to regress,
// a non-cooperative lease failover to a third lease held by a different node
// could overlap in MVCC time with the first lease (i.e. its start time could
// precede expLease.Expiration), violating the lease disjointness property.
//
// If we disable the `expToEpochPromo` branch in replica_range_lease.go, this
// assertion fails.
require.True(t, expLease.Expiration().Less(epochLease.Expiration()))
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2820,6 +2820,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
// We plan to increment the manual clock by MinStatsDuration a few
// times below and would like for leases to not expire. Configure a
// longer lease duration to achieve this.
RangeLeaseDuration: 10 * replicastats.MinStatsDuration,
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
Expand All @@ -2837,8 +2843,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))

tc.IncrClockForLeaseUpgrade(t, manualClock)
tc.WaitForLeaseUpgrade(ctx, t, desc)

cap, err := s.Capacity(ctx, false /* useCached */)
Expand Down
101 changes: 79 additions & 22 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -219,7 +220,6 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
startKey roachpb.Key,
transfer bool,
bypassSafetyChecks bool,
) *leaseRequestHandle {
if nextLease, ok := p.RequestPending(); ok {
Expand All @@ -237,10 +237,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
nextLeaseHolder.ReplicaID, nextLease.Replica.ReplicaID))
}

acquisition := !status.Lease.OwnedBy(p.repl.store.StoreID())
extension := !transfer && !acquisition
// Who owns the previous and next lease?
prevLocal := status.Lease.OwnedBy(p.repl.store.StoreID())
nextLocal := nextLeaseHolder.StoreID == p.repl.store.StoreID()

// Assert that the lease acquisition, extension, or transfer is valid.
acquisition := !prevLocal && nextLocal
extension := prevLocal && nextLocal
transfer := prevLocal && !nextLocal
remote := !prevLocal && !nextLocal
_ = extension // not used, just documentation

if remote {
log.Fatalf(ctx, "cannot acquire/extend lease for remote replica: %v -> %v", status, nextLeaseHolder)
}
if acquisition {
// If this is a non-cooperative lease change (i.e. an acquisition), it
// is up to us to ensure that Lease.Start is greater than the end time
Expand Down Expand Up @@ -279,6 +289,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

var reqLeaseLiveness livenesspb.Liveness
if p.repl.shouldUseExpirationLeaseRLocked() ||
(transfer &&
TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV) &&
Expand Down Expand Up @@ -310,6 +321,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
return llHandle
}
reqLease.Epoch = l.Epoch
reqLeaseLiveness = l.Liveness
}

var leaseReq kvpb.Request
Expand Down Expand Up @@ -340,7 +352,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
}
}

if err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq); err != nil {
if err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq); err != nil {
// We failed to start the asynchronous task. Send a blank NotLeaseHolderError
// back to indicate that we have no idea who the range lease holder might
// be; we've withdrawn from active duty.
Expand All @@ -364,10 +376,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
//
// The status argument is used as the expected value for liveness operations.
// leaseReq must be consistent with the LeaseStatus.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLeaseAsync(
parentCtx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
) error {
// Create a new context. We run the request to completion even if all callers
Expand All @@ -394,7 +410,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
func(ctx context.Context) {
defer sp.Finish()

err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq)
err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -429,29 +445,63 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
return nil
}

var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second)

// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLease(
ctx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
) error {
started := timeutil.Now()
defer func() {
p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds())
}()

nextLeaseHolder := reqLease.Replica
extension := status.OwnedBy(nextLeaseHolder.StoreID)

// If we are promoting an expiration-based lease to an epoch-based lease, we
// must make sure the expiration does not regress. We do this here because the
// expiration is stored directly in the lease for expiration-based leases but
// indirectly in liveness record for epoch-based leases. To ensure this, we
// manually heartbeat our liveness record if necessary. This is expected to
// work because the liveness record interval and the expiration-based lease
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
// Assert that the liveness record expiration is now greater than the
// expiration of the lease we're promoting.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat", l, status.Lease)
}
}

// If we're replacing an expired epoch-based lease, we must increment the
// epoch of the prior owner to invalidate its leases. If we were the owner,
// then we instead heartbeat to become live.
if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
if extension {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
Expand Down Expand Up @@ -844,7 +894,7 @@ func (r *Replica) requestLeaseLocked(
}
return r.mu.pendingLeaseRequest.InitOrJoinRequest(
ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(),
false /* transfer */, false /* bypassSafetyChecks */)
false /* bypassSafetyChecks */)
}

// AdminTransferLease transfers the LeaderLease to another replica. Only the
Expand Down Expand Up @@ -938,7 +988,7 @@ func (r *Replica) AdminTransferLease(
}

transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest(
ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true /* transfer */, bypassSafetyChecks,
ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), bypassSafetyChecks,
)
return nil, transfer, nil
}
Expand Down Expand Up @@ -1486,17 +1536,24 @@ func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb
// maybeSwitchLeaseType will synchronously renew a lease using the appropriate
// type if it is (or was) owned by this replica and has an incorrect type. This
// typically happens when changing kv.expiration_leases_only.enabled.
func (r *Replica) maybeSwitchLeaseType(ctx context.Context, st kvserverpb.LeaseStatus) *kvpb.Error {
if !st.OwnedBy(r.store.StoreID()) {
return nil
}
func (r *Replica) maybeSwitchLeaseType(ctx context.Context) *kvpb.Error {
llHandle := func() *leaseRequestHandle {
now := r.store.Clock().NowAsClockTimestamp()
// The lease status needs to be checked and requested under the same lock,
// to avoid an interleaving lease request changing the lease between the
// two.
r.mu.Lock()
defer r.mu.Unlock()

var llHandle *leaseRequestHandle
r.mu.Lock()
if !r.hasCorrectLeaseTypeRLocked(st.Lease) {
llHandle = r.requestLeaseLocked(ctx, st)
}
r.mu.Unlock()
st := r.leaseStatusAtRLocked(ctx, now)
if !st.OwnedBy(r.store.StoreID()) {
return nil
}
if r.hasCorrectLeaseTypeRLocked(st.Lease) {
return nil
}
return r.requestLeaseLocked(ctx, st)
}()

if llHandle != nil {
select {
Expand Down
19 changes: 15 additions & 4 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,13 +1374,24 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) {
// Expire the lease. Given that the Raft leadership is on n2, only n2 will be
// eligible to acquire a new lease.
log.Infof(ctx, "test expiring lease")
nl := n2.NodeLiveness().(*liveness.NodeLiveness)
resumeHeartbeats := nl.PauseAllHeartbeatsForTest()
n2Liveness, ok := nl.Self()
nl2 := n2.NodeLiveness().(*liveness.NodeLiveness)
resumeHeartbeats := nl2.PauseAllHeartbeatsForTest()
n2Liveness, ok := nl2.Self()
require.True(t, ok)
manualClock.Increment(n2Liveness.Expiration.ToTimestamp().Add(1, 0).WallTime - manualClock.UnixNano())
atomic.StoreInt64(&rejectExtraneousRequests, 1)
// Ask another node to increment n2's liveness record.

// Ask another node to increment n2's liveness record, but first, wait until
// n1's liveness state is the same as n2's. Otherwise, the epoch below might
// get rejected because of mismatching liveness records.
testutils.SucceedsSoon(t, func() error {
nl1 := n1.NodeLiveness().(*liveness.NodeLiveness)
n2LivenessFromN1, _ := nl1.GetLiveness(n2.NodeID())
if n2Liveness != n2LivenessFromN1.Liveness {
return errors.Errorf("waiting for node 2 liveness to converge on both nodes 1 and 2")
}
return nil
})
require.NoError(t, n1.NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch(ctx, n2Liveness))
resumeHeartbeats()

Expand Down
Loading