Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: deflake TestRequestsOnFollowerWithNonLiveLeaseholder #107442

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,21 +2537,21 @@ func (ds *DistSender) getLocalityComparison(
) roachpb.LocalityComparisonType {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
log.VEventf(ctx, 5, "failed to perform look up for node descriptor %v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
log.VEventf(ctx, 5, "failed to perform look up for node descriptor %v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}

comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr)
}
return comparisonResult
}
Expand Down
27 changes: 17 additions & 10 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,7 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism

manualClock := hlc.NewHybridManualClock()
clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Expand All @@ -1361,12 +1362,8 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
RaftEnableCheckQuorum: true,
},
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're
// setting the liveness duration as low as possible while still
// keeping the test stable.
LivenessDuration: 3000 * time.Millisecond,
RenewalDuration: 1500 * time.Millisecond,
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period
Expand Down Expand Up @@ -1435,26 +1432,36 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
atomic.StoreInt32(&installPartition, 1)

// Wait until the lease expires.
log.Infof(ctx, "test: waiting for lease expiration")
log.Infof(ctx, "test: waiting for lease expiration on r%d", store0Repl.RangeID)
testutils.SucceedsSoon(t, func() error {
dur, _ := store0.GetStoreConfig().NodeLivenessDurations()
manualClock.Increment(dur.Nanoseconds())
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
// If we failed to pin the lease, it likely won't ever expire due to the particular
// partition we've set up. Bail early instead of wasting 45s.
require.True(t, leaseStatus.Lease.OwnedBy(store0.StoreID()), "failed to pin lease")

// Lease is on s1, and it should be invalid now. The only reason there's a
// retry loop is that there could be a race where we bump the clock while a
// heartbeat is inflight (and which picks up the new expiration).
if leaseStatus.IsValid() {
return errors.New("lease still valid")
return errors.Errorf("lease still valid: %+v", leaseStatus)
}
return nil
})
log.Infof(ctx, "test: lease expired")

{
tBegin := timeutil.Now()
// Increment the initial value again, which requires range availability. To
// get there, the request will need to trigger a lease request on a follower
// replica, which will call a Raft election, acquire Raft leadership, then
// acquire the range lease.
log.Infof(ctx, "test: waiting for new lease...")
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for new lease...")
tc.WaitForValues(t, key, []int64{2, 2, 2, 0})
log.Infof(ctx, "test: waiting for new lease... done")
log.Infof(ctx, "test: waiting for new lease... done [%.2fs]", timeutil.Since(tBegin).Seconds())
}

// Store 0 no longer holds the lease.
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
Expand Down Expand Up @@ -620,6 +621,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(
return used, propErr
}

var logCampaignOnRejectLease = log.Every(10 * time.Second)

// maybeRejectUnsafeProposalLocked conditionally rejects proposals that are
// deemed unsafe, given the current state of the raft group. Requests that may
// be deemed unsafe and rejected at this level are those whose safety has some
Expand Down Expand Up @@ -702,7 +705,12 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
li.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
if b.p.shouldCampaignOnRedirect(raftGroup) {
log.VEventf(ctx, 2, "campaigning because Raft leader not live in node liveness map")
const format = "campaigning because Raft leader (id=%d) not live in node liveness map"
if logCampaignOnRejectLease.ShouldLog() {
log.Infof(ctx, format, li.leader)
} else {
log.VEventf(ctx, 2, format, li.leader)
}
b.p.campaignLocked(ctx)
}
return true
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
return nil
}

var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second)
tbg marked this conversation as resolved.
Show resolved Hide resolved

// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
Expand All @@ -465,9 +467,8 @@ func (p *pendingLeaseRequest) requestLease(
if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
if status.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,10 @@ func (s *Store) getLocalityComparison(
secLocality := s.cfg.StorePool.GetNodeLocality(toNodeID)
comparisonResult, regionErr, zoneErr := firstLocality.CompareWithLocality(secLocality)
if regionErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr)
}
return comparisonResult
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,10 +1366,10 @@ func (n *Node) getLocalityComparison(

comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr)
}

return comparisonResult
Expand Down