Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107265: liveness: allow registering callbacks after start r=erikgrinaker a=tbg

I discovered[^1] a deadlock scenario when multiple nodes in the cluster restart
with additional stores that need to be bootstrapped. In that case, liveness
must be running when the StoreIDs are allocated, but it is not.

Trying to address this problem, I realized that when an auxiliary Store is bootstrapped,
it will create a new replicateQueue, which will register a new callback into NodeLiveness.

But if liveness must be started at this point to fix #106706, we'll run into the assertion
that checks that we don't register callbacks on a started node liveness.

Something's got to give: we will allow registering callbacks at any given point
in time, and they'll get an initial set of notifications synchronously. I
audited the few users of RegisterCallback and this seems OK with all of them.

[^1]: #106706 (comment)

Epic: None
Release note: None


107417: kvserver: ignore RPC conn when deciding to campaign/vote r=erikgrinaker a=erikgrinaker

**kvserver: remove stale mayCampaignOnWake comment**

The comment is about a parameter that no longer exists.

**kvserver: revamp shouldCampaign/Forget tests**

**kvserver: ignore RPC conn in `shouldCampaignOnWake`**

Previously, `shouldCampaignOnWake()` used `IsLiveMapEntry.IsLive` to determine whether the leader was dead. However, this not only depends on the node's liveness, but also its RPC connectivity. This can prevent an unquiescing replica from acquiring Raft leadership if the leader is still alive but unable to heartbeat liveness, and the leader will be unable to acquire epoch leases in this case.

This patch ignores the RPC connection state when deciding whether to campaign, using only on the liveness state.

**kvserver: ignore RPC conn in `shouldForgetLeaderOnVoteRequest`**

Previously, `shouldForgetLeaderOnVoteRequest()` used `IsLiveMapEntry.IsLive` to determine whether the leader was dead. However, this not only depends on the node's liveness, but also its RPC connectivity. This can prevent granting votes to a new leader that may be attempting to acquire a epoch lease (which the current leader can't).

This patch ignores the RPC connection state when deciding whether to campaign, using only on the liveness state.

Resolves #107060.
Epic: none
Release note: None

**kvserver: remove `StoreTestingKnobs.DisableLivenessMapConnHealth`**

107424: kvserver: scale Raft entry cache size with system memory r=erikgrinaker a=erikgrinaker

The Raft entry cache size defaulted to 16 MB, which is rather small. This has been seen to cause tail latency and throughput degradation with high write volume on large nodes, correlating with a reduction in the entry cache hit rate.

This patch linearly scales the Raft entry cache size as 1/256 of total system/cgroup memory, shared evenly between all stores, with a minimum 32 MB. For example, a 32 GB 8-vCPU node will have a 128 MB entry cache.

This is a conservative default, since this memory is not accounted for in existing memory budgets nor by the `--cache` flag. We rarely see cache misses in production clusters anyway, and have seen significantly improved hit rates with this scaling (e.g. a 64 KB kv0 workload on 8-vCPU nodes increased from 87% to 99% hit rate).

Resolves #98666.
Epic: none

Release note (performance improvement): The default Raft entry cache size has been increased from 16 MB to 1/256 of system memory with a minimum of 32 MB, divided evenly between all stores. This can be configured via `COCKROACH_RAFT_ENTRY_CACHE_SIZE`.

107442: kvserver: deflake TestRequestsOnFollowerWithNonLiveLeaseholder r=erikgrinaker a=tbg

The test previously relied on aggressive liveness heartbeat expirations to
avoid running for too long. As a result, it was flaky since liveness wasn't
reliably pinned in the way the test wanted.

The hybrid manual clock allows time to jump forward at an opportune moment.
Use it here to avoid running with a tight lease interval.

On my gceworker, previously flaked within a few minutes. As of this commit, I
ran it for double-digit minutes without issue.

Fixes #107200.

Epic: None
Release note: None


107526: kvserver: fail gracefully in TestLeaseTransferRejectedIfTargetNeedsSnapshot r=erikgrinaker a=tbg

We saw this test hang in CI. What likely happened (according to the stacks) is
that a lease transfer that was supposed to be caught by an interceptor never
showed up in the interceptor. The most likely explanation is that it errored
out before it got to evaluation. It then signaled a channel the test was only
prepared to check later, so the test hung (waiting for a channel that was now
never to be touched).

This test is hard to maintain. It would be great (though, for now, out of reach)
to write tests like it in a deterministic framework[^1]

[^1]: see #105177.

For now, fix the test so that when the (so far unknown) error rears its
head again, it will fail properly, so we get to see the error and can
take another pass at fixing the test (separately). Stressing
this commit[^2], we get:

> transferErrC unexpectedly signaled: /Table/Max: transfer lease unexpected
> error: refusing to transfer lease to (n3,s3):3 because target may need a Raft
> snapshot: replica in StateProbe

This makes sense. The test wants to exercise the below-raft mechanism, but
the above-raft mechanism also exists and while we didn't want to interact
with it, we sometimes do[^1]

The second commit introduces a testing knob that disables the above-raft
mechanism selectively. I've stressed the test for 15 minutes without issues
after this change.

[^1]: somewhat related to #107524
[^2]: `./dev test --filter TestLeaseTransferRejectedIfTargetNeedsSnapshot --stress ./pkg/kv/kvserver/` on gceworker, 285s

Fixes #106383.

Epic: None
Release note: None


107531: kvserver: disable replicate queue and lease transfers in closedts tests r=erikgrinaker a=tbg

For a more holistic suggestion on how to fix this for the likely many other
tests susceptible to similar issues, see:
#107528

> 1171 runs so far, 0 failures, over 15m55s

Fixes #101824.

Release note: None
Epic: none


Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
  • Loading branch information
3 people committed Jul 26, 2023
7 parents f86affa + d767731 + 02e999d + c435472 + 04d3682 + 1c8c503 + 2c72155 commit f147c2b
Show file tree
Hide file tree
Showing 23 changed files with 709 additions and 367 deletions.
5 changes: 5 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ func (g *Gossip) AssertNotStarted(ctx context.Context) {
}
}

// GetNodeID gets the NodeID.
func (g *Gossip) GetNodeID() roachpb.NodeID {
return g.NodeID.Get()
}

// GetNodeMetrics returns the gossip node metrics.
func (g *Gossip) GetNodeMetrics() *Metrics {
return g.server.GetNodeMetrics()
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,21 +2537,21 @@ func (ds *DistSender) getLocalityComparison(
) roachpb.LocalityComparisonType {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
log.VEventf(ctx, 5, "failed to perform look up for node descriptor %v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
log.VEventf(ctx, 5, "failed to perform look up for node descriptor %v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}

comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr)
log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr)
}
return comparisonResult
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ go_library(
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/security/username",
"//pkg/server/status",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
30 changes: 17 additions & 13 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,7 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism

manualClock := hlc.NewHybridManualClock()
clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Expand All @@ -1361,12 +1362,8 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
RaftEnableCheckQuorum: true,
},
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're
// setting the liveness duration as low as possible while still
// keeping the test stable.
LivenessDuration: 3000 * time.Millisecond,
RenewalDuration: 1500 * time.Millisecond,
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period
Expand Down Expand Up @@ -1435,26 +1432,36 @@ func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
atomic.StoreInt32(&installPartition, 1)

// Wait until the lease expires.
log.Infof(ctx, "test: waiting for lease expiration")
log.Infof(ctx, "test: waiting for lease expiration on r%d", store0Repl.RangeID)
testutils.SucceedsSoon(t, func() error {
dur, _ := store0.GetStoreConfig().NodeLivenessDurations()
manualClock.Increment(dur.Nanoseconds())
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
// If we failed to pin the lease, it likely won't ever expire due to the particular
// partition we've set up. Bail early instead of wasting 45s.
require.True(t, leaseStatus.Lease.OwnedBy(store0.StoreID()), "failed to pin lease")

// Lease is on s1, and it should be invalid now. The only reason there's a
// retry loop is that there could be a race where we bump the clock while a
// heartbeat is inflight (and which picks up the new expiration).
if leaseStatus.IsValid() {
return errors.New("lease still valid")
return errors.Errorf("lease still valid: %+v", leaseStatus)
}
return nil
})
log.Infof(ctx, "test: lease expired")

{
tBegin := timeutil.Now()
// Increment the initial value again, which requires range availability. To
// get there, the request will need to trigger a lease request on a follower
// replica, which will call a Raft election, acquire Raft leadership, then
// acquire the range lease.
log.Infof(ctx, "test: waiting for new lease...")
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for new lease...")
tc.WaitForValues(t, key, []int64{2, 2, 2, 0})
log.Infof(ctx, "test: waiting for new lease... done")
log.Infof(ctx, "test: waiting for new lease... done [%.2fs]", timeutil.Since(tBegin).Seconds())
}

// Store 0 no longer holds the lease.
Expand Down Expand Up @@ -6771,9 +6778,6 @@ func TestRaftPreVoteUnquiesceDeadLeader(t *testing.T) {
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
DisableLivenessMapConnHealth: true, // to mark n1 as not live
},
},
},
})
Expand Down
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2937,6 +2937,9 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// If we're testing the below-raft check, disable the above-raft check.
// See: https://github.com/cockroachdb/cockroach/pull/107526
DisableAboveRaftLeaseTransferSafetyChecks: rejectAfterRevoke,
TestingRequestFilter: func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if rejectAfterRevoke && ba.IsSingleTransferLeaseRequest() {
transferLeaseReqBlockOnce.Do(func() {
Expand Down Expand Up @@ -3007,10 +3010,17 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) {
// Replica.AdminTransferLease.
transferErrC := make(chan error, 1)
if rejectAfterRevoke {
_ = tc.Stopper().RunAsyncTask(ctx, "transfer lease", func(ctx context.Context) {
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "transfer lease", func(ctx context.Context) {
transferErrC <- tc.TransferRangeLease(*repl0.Desc(), tc.Target(2))
})
<-transferLeaseReqBlockedC
}))
select {
case <-transferLeaseReqBlockedC:
// Expected case: lease transfer triggered our interceptor and is now
// waiting there for transferLeaseReqUnblockedCh.
case err := <-transferErrC:
// Unexpected case: lease transfer errored out before making it into the filter.
t.Fatalf("transferErrC unexpectedly signaled: %v", err)
}
}

// Truncate the log at index+1 (log entries < N are removed, so this
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,9 +1206,15 @@ func setupClusterForClosedTSTesting(
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '%s';
SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off';
`, targetDuration, targetDuration/4),
";")...)

// Disable replicate queues to avoid errant lease transfers.
//
// See: https://github.com/cockroachdb/cockroach/issues/101824.
tc.ToggleReplicateQueues(false)

return tc, tc.ServerConn(0), desc
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/liveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
embed = [":liveness"],
deps = [
"//pkg/base",
"//pkg/gossip",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/plan",
"//pkg/kv/kvserver/liveness/livenesspb",
Expand All @@ -69,6 +70,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
17 changes: 12 additions & 5 deletions pkg/kv/kvserver/liveness/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type UpdateInfo struct {
lastUnavailableTime hlc.Timestamp
}

// Gossip is the subset of *gossip.Gossip used by liveness.
type Gossip interface {
RegisterCallback(pattern string, method gossip.Callback, opts ...gossip.CallbackOption) func()
GetNodeID() roachpb.NodeID
}

var livenessRegex = gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix)
var storeRegex = gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix)

// cache stores updates to both Liveness records and the store descriptor map.
// It doesn't store the entire StoreDescriptor, only the time when it is
// updated. The StoreDescriptor is sent directly from nodes so doesn't require
Expand All @@ -38,7 +47,7 @@ type UpdateInfo struct {
// change to take this into account. Only epoch leases will use the liveness
// timestamp directly.
type cache struct {
gossip *gossip.Gossip
gossip Gossip
clock *hlc.Clock
notifyLivenessChanged func(old, new livenesspb.Liveness)
mu struct {
Expand All @@ -54,7 +63,7 @@ type cache struct {
}

func newCache(
g *gossip.Gossip, clock *hlc.Clock, cbFn func(livenesspb.Liveness, livenesspb.Liveness),
g Gossip, clock *hlc.Clock, cbFn func(livenesspb.Liveness, livenesspb.Liveness),
) *cache {
c := cache{}
c.gossip = g
Expand All @@ -71,13 +80,11 @@ func newCache(
// nl.Start() is invoked. At the time of writing this invariant does
// not hold (which is a problem, since the node itself won't be live
// at this point, and requests routed to it will hang).
livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix)
c.gossip.RegisterCallback(livenessRegex, c.livenessGossipUpdate)

// Enable redundant callbacks for the store keys because we use these
// callbacks as a clock to determine when a store was last updated even if it
// hasn't otherwise changed.
storeRegex := gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix)
c.gossip.RegisterCallback(storeRegex, c.storeGossipUpdate, gossip.Redundant)
}
return &c
Expand All @@ -86,7 +93,7 @@ func newCache(
// selfID returns the ID for this node according to Gossip. This will be 0
// until the node has joined the cluster.
func (c *cache) selfID() roachpb.NodeID {
return c.gossip.NodeID.Get()
return c.gossip.GetNodeID()
}

// livenessGossipUpdate is the gossip callback used to keep the
Expand Down
Loading

0 comments on commit f147c2b

Please sign in to comment.