diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index b65d3860ec39..91695b06bb2d 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -181,4 +181,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-78 set the active cluster version in the format '.' +version version 21.2-80 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 025aecca78cb..de29f105a383 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -194,6 +194,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-78set the active cluster version in the format '.' +versionversion21.2-80set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 5cd81f5c356f..255925356a61 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -306,6 +306,10 @@ const ( // ExperimentalMVCCRangeTombstones enables the use of highly experimental MVCC // range tombstones. ExperimentalMVCCRangeTombstones + // LooselyCoupledRaftLogTruncation allows the cluster to reduce the coupling + // for raft log truncation, by allowing each replica to treat a truncation + // proposal as an upper bound on what should be truncated. + LooselyCoupledRaftLogTruncation // ************************************************* // Step (1): Add new versions here. @@ -498,6 +502,10 @@ var versionsSingleton = keyedVersions{ Key: ExperimentalMVCCRangeTombstones, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 78}, }, + { + Key: LooselyCoupledRaftLogTruncation, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 80}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 4d86e6f3432b..e53e4338fa70 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -48,11 +48,12 @@ func _() { _ = x[EnablePebbleFormatVersionRangeKeys-37] _ = x[BackupResolutionInJob-38] _ = x[ExperimentalMVCCRangeTombstones-39] + _ = x[LooselyCoupledRaftLogTruncation-40] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysBackupResolutionInJobExperimentalMVCCRangeTombstones" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysBackupResolutionInJobExperimentalMVCCRangeTombstonesLooselyCoupledRaftLogTruncation" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930, 954, 988, 1022, 1043, 1074} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930, 954, 988, 1022, 1043, 1074, 1105} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 0aca5a8ace03..e24b2673dcf9 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -24,8 +24,10 @@ go_library( "queue_helpers_testutil.go", "raft.go", "raft_log_queue.go", + "raft_log_truncator.go", "raft_snapshot_queue.go", "raft_transport.go", + "raft_truncator_replica.go", "replica.go", "replica_application_cmd.go", "replica_application_cmd_buf.go", @@ -164,6 +166,7 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/bufalloc", + "//pkg/util/buildutil", "//pkg/util/circuit", "//pkg/util/contextutil", "//pkg/util/ctxgroup", @@ -252,6 +255,7 @@ go_test( "queue_concurrency_test.go", "queue_test.go", "raft_log_queue_test.go", + "raft_log_truncator_test.go", "raft_test.go", "raft_transport_test.go", "raft_transport_unit_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go index ace1ae9399a4..633231a2cb5f 100644 --- a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go +++ b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -83,13 +84,32 @@ func TruncateLog( return result.Result{}, errors.Wrap(err, "getting term") } - // Compute the number of bytes freed by this truncation. Note that this will - // only make sense for the leaseholder as we base this off its own first - // index (other replicas may have other first indexes assuming we're not - // still using the legacy truncated state key). In principle, this could be - // off either way, though in practice we don't expect followers to have - // a first index smaller than the leaseholder's (see #34287), and most of - // the time everyone's first index should be the same. + // Compute the number of bytes freed by this truncation. Note that using + // firstIndex only make sense for the leaseholder as we base this off its + // own first index (other replicas may have other first indexes). In + // principle, this could be off either way, though in practice we don't + // expect followers to have a first index smaller than the leaseholder's + // (see #34287), and most of the time everyone's first index should be the + // same. + // Additionally, it is possible that a write-heavy range has multiple in + // flight TruncateLogRequests, and using the firstIndex will result in + // duplicate accounting. The ExpectedFirstIndex, populated for clusters at + // LooselyCoupledRaftLogTruncation, allows us to avoid this problem. + // + // We have an additional source of error not mitigated by + // ExpectedFirstIndex. There is nothing synchronizing firstIndex with the + // state visible in readWriter. The former uses the in-memory state or + // fetches directly from the Engine. The latter uses Engine state from some + // point in time which can fall anywhere in the time interval starting from + // when the readWriter was created up to where we create an MVCCIterator + // below. + // TODO(sumeer): we can eliminate this error as part of addressing + // https://github.com/cockroachdb/cockroach/issues/55461 and + // https://github.com/cockroachdb/cockroach/issues/70974 that discuss taking + // a consistent snapshot of some Replica state and the engine. + if args.ExpectedFirstIndex > firstIndex { + firstIndex = args.ExpectedFirstIndex + } start := keys.RaftLogKey(rangeID, firstIndex) end := keys.RaftLogKey(rangeID, args.Index) @@ -98,12 +118,8 @@ func TruncateLog( // downstream of Raft. // // Note that any sideloaded payloads that may be removed by this truncation - // don't matter; they're not tracked in the raft log delta. - // - // TODO(tbg): it's difficult to prove that this computation doesn't have - // bugs that let it diverge. It might be easier to compute the stats - // from scratch, stopping when 4mb (defaultRaftLogTruncationThreshold) - // is reached as at that point we'll truncate aggressively anyway. + // are not tracked in the raft log delta. The delta will be adjusted below + // raft. iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{UpperBound: end}) defer iter.Close() // We can pass zero as nowNanos because we're only interested in SysBytes. @@ -122,7 +138,10 @@ func TruncateLog( pd.Replicated.State = &kvserverpb.ReplicaState{ TruncatedState: tState, } - pd.Replicated.RaftLogDelta = ms.SysBytes + if cArgs.EvalCtx.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).IsActive( + clusterversion.LooselyCoupledRaftLogTruncation) { + pd.Replicated.RaftExpectedFirstIndex = firstIndex + } return pd, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go b/pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go index b914a92457e6..8bc4e3dcc15b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -62,10 +63,12 @@ func TestTruncateLog(t *testing.T) { firstIndex = 100 ) + st := cluster.MakeTestingClusterSettings() evalCtx := &MockEvalCtx{ - Desc: &roachpb.RangeDescriptor{RangeID: rangeID}, - Term: term, - FirstIndex: firstIndex, + ClusterSettings: st, + Desc: &roachpb.RangeDescriptor{RangeID: rangeID}, + Term: term, + FirstIndex: firstIndex, } eng := storage.NewDefaultInMemForTesting() diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index c08f8e1fd1e9..d5da4ac5f202 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -234,10 +234,12 @@ func (p *Result) MergeAndDestroy(q Result) error { if p.Replicated.State.TruncatedState == nil { p.Replicated.State.TruncatedState = q.Replicated.State.TruncatedState + p.Replicated.RaftExpectedFirstIndex = q.Replicated.RaftExpectedFirstIndex } else if q.Replicated.State.TruncatedState != nil { return errors.AssertionFailedf("conflicting TruncatedState") } q.Replicated.State.TruncatedState = nil + q.Replicated.RaftExpectedFirstIndex = 0 if q.Replicated.State.GCThreshold != nil { if p.Replicated.State.GCThreshold == nil { diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 2e19c02270b9..6313c5083125 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -136,7 +136,25 @@ message ReplicatedEvalResult { storage.enginepb.MVCCStats deprecated_delta = 10; // See #18828 storage.enginepb.MVCCStatsDelta delta = 18 [(gogoproto.nullable) = false]; ChangeReplicas change_replicas = 12; + + // RaftLogDelta is the delta in bytes caused by truncation of the raft log. + // It is only populated when evaluating a TruncateLogRequest. The inclusive + // index for the truncation is specified in State.TruncatedState. This delta + // is computed under the assumption that the truncation is happening over + // the interval [RaftExpectedFirstIndex, index]. If the actual truncation at + // a replica is over some interval [x, interval] where x != + // RaftExpectedFirstIndex it is that replica's job to recalculate this delta + // in order to be accurate, or to make note of the fact that its raft log + // size stats may now be inaccurate. + // + // NB: this delta does not include the byte size of sideloaded entries. + // Sideloaded entries are not expected to be common enough that it is worth + // the optimization to calculate the delta once (at the leaseholder). int64 raft_log_delta = 13; + // RaftExpectedFirstIndex is populated starting at cluster version + // LooselyCoupledRaftLogTruncation. When this is not populated, the replica + // should not delay enacting the truncation. + uint64 raft_expected_first_index = 25; // MVCCHistoryMutation describes mutations of MVCC history that may violate // the closed timestamp, timestamp cache, and guarantees that rely on these diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 3c686e404b04..0ecbb3312565 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -16,8 +16,11 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -29,6 +32,92 @@ import ( "go.etcd.io/etcd/raft/v3/tracker" ) +// Overview of Raft log truncation: +// +// The safety requirement for truncation is that the entries being truncated +// are already durably applied to the state machine. This is because after a +// truncation, the only remaining source of information regarding the data in +// the truncated entries is the state machine, which represents a prefix of +// the log. If we truncated entries that were not durably applied to the state +// machine, a crash would create a gap in what the state machine knows and the +// first entry in the untruncated log, which prevents any more application. +// +// Initialized replicas may need to provide log entries to slow followers to +// catch up, so for performance reasons they should also base truncation on +// the state of followers. Additionally, truncation should typically do work +// when there are "significant" bytes or number of entries to truncate. +// However, if the replica is quiescent we would like to truncate the whole +// log when it becomes possible. +// +// An attempt is made to add a replica to the queue under two situations: +// - Event occurs that indicates that there are significant bytes/entries that +// can be truncated. Until the truncation is proposed (see below), these +// events can keep firing. The queue dedups the additions until the replica +// has been processed. Note that there is insufficient information at the +// time of addition to predict that the truncation will actually happen. +// Only the processing will finally decide whether truncation should happen, +// hence the deduping cannot happen outside the queue (say by changing the +// firing condition). If nothing is done when processing the replica, the +// continued firing of the events will cause the replica to again be added +// to the queue. In the current code, these events can only trigger after +// application to the state machine. +// +// - Periodic addition via the replicaScanner: this is helpful in two ways (a) +// the events in the previous bullet can under-fire if the size estimates +// are wrong, (b) if the replica becomes quiescent, those events can stop +// firing but truncation may not have been done due to other constraints +// (like lagging followers). The periodic addition (polling) takes care of +// ensuring that when those other constraints are removed, the truncation +// happens. +// +// The raftLogQueue proposes "replicated" truncation. This is done by the raft +// leader, which has knowledge of the followers and results in a +// TruncateLogRequest. This proposal will be raft replicated and serve as an +// upper bound to all replicas on what can be truncated. Each replica +// remembers in-memory what truncations have been proposed, so that truncation +// can be done independently at each replica when the corresponding +// RaftAppliedIndex is durable (see raftLogTruncator). Note that since raft +// state (including truncated state) is not part of the state machine, this +// loose coupling is fine. The loose coupling is enabled with cluster version +// LooselyCoupledRaftLogTruncation and cluster setting +// kv.raft_log.enable_loosely_coupled_truncation. When not doing loose +// coupling (legacy), the proposal causes immediate truncation -- this is +// correct because other externally maintained invariants ensure that the +// state machine is durable (though we have some concerns in +// https://github.com/cockroachdb/cockroach/issues/38566). +// +// NB: Loosely coupled truncation loses the pending truncations that were +// queued in-memory when a node restarts. This is considered ok for now since +// either (a) the range will keep seeing new writes and eventually another +// truncation will be proposed, (b) if the range becomes quiescent we are +// willing to accept some amount of garbage. (b) can be addressed by +// unilaterally truncating at each follower if the range is quiescent. And +// since we check that the RaftAppliedIndex is durable, it is easy to truncate +// all the entries of the log in this quiescent case. + +// This is a temporary cluster setting that we will remove after one release +// cycle of everyone running with the default value of true. It only exists as +// a safety switch in case the new behavior causes unanticipated issues. +// Current plan: +// - v22.1: Has the setting. Expectation is that no one changes to false. +// - v22.2: The code behavior is hard-coded to true, in that the setting has +// no effect (we can also delete a bunch of legacy code). +// Mixed version clusters: +// - v21.2 and v22.1: Will behave as strongly coupled since the cluster +// version serves as an additional gate. +// - v22.1 and v22.2: If the setting has been changed to false the v22.1 nodes +// will do strongly coupled truncation and the v22.2 will do loosely +// coupled. This co-existence is correct. +var looselyCoupledTruncationEnabled = func() *settings.BoolSetting { + s := settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.raft_log.loosely_coupled_truncation.enabled", + "set to true to loosely couple the raft log truncation", + true) + s.SetVisibility(settings.Reserved) + return s +}() + const ( // raftLogQueueTimerDuration is the duration between truncations. raftLogQueueTimerDuration = 0 // zero duration to process truncations greedily @@ -94,7 +183,9 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue { // newTruncateDecision returns a truncateDecision for the given Replica if no // error occurs. If input data to establish a truncateDecision is missing, a -// zero decision is returned. +// zero decision is returned. When there are pending truncations queued below +// raft (see raftLogTruncator), this function pretends as if those truncations +// have already happened, and decides whether another truncation is merited. // // At a high level, a truncate decision operates based on the Raft log size, the // number of entries in the log, and the Raft status of the followers. In an @@ -142,17 +233,16 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue { // // Ideally, a Raft log that grows large for whichever reason (for instance the // queue being stuck on another replica) wouldn't be more than a nuisance on -// nodes with sufficient disk space. Unfortunately, at the time of writing, the -// Raft log is included in Raft snapshots. On the other hand, IMPORT/RESTORE's -// split/scatter phase interacts poorly with overly aggressive truncations and -// can DDOS the Raft snapshot queue. +// nodes with sufficient disk space. Also, IMPORT/RESTORE's split/scatter +// phase interacts poorly with overly aggressive truncations and can DDOS the +// Raft snapshot queue. func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, error) { rangeID := r.RangeID now := timeutil.Now() // NB: we need an exclusive lock due to grabbing the first index. r.mu.Lock() - raftLogSize := r.mu.raftLogSize + raftLogSize := r.pendingLogTruncations.computePostTruncLogSize(r.mu.raftLogSize) // A "cooperative" truncation (i.e. one that does not cut off followers from // the log) takes place whenever there are more than // RaftLogQueueStaleThreshold entries or the log's estimated size is above @@ -171,16 +261,24 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err } raftStatus := r.raftStatusRLocked() - firstIndex, err := r.raftFirstIndexLocked() const anyRecipientStore roachpb.StoreID = 0 pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore) lastIndex := r.mu.lastIndex + // NB: raftLogSize above adjusts for pending truncations that have already + // been successfully replicated via raft, but logSizeTrusted does not see if + // those pending truncations would cause a transition from trusted => + // !trusted. This is done since we don't want to trigger a recomputation of + // the raft log size while we still have pending truncations. Note that as + // soon as those pending truncations are enacted r.mu.raftLogSizeTrusted + // will become false and we will recompute the size -- so this cannot cause + // an indefinite delay in recomputation. logSizeTrusted := r.mu.raftLogSizeTrusted + firstIndex, err := r.raftFirstIndexLocked() r.mu.Unlock() - if err != nil { return truncateDecision{}, errors.Wrapf(err, "error retrieving first index for r%d", rangeID) } + firstIndex = r.pendingLogTruncations.computePostTruncFirstIndex(firstIndex) if raftStatus == nil { if log.V(6) { @@ -189,8 +287,8 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err return truncateDecision{}, nil } - // Is this the raft leader? We only perform log truncation on the raft leader - // which has the up to date info on followers. + // Is this the raft leader? We only propose log truncation on the raft + // leader which has the up to date info on followers. if raftStatus.RaftState != raft.StateLeader { return truncateDecision{}, nil } @@ -559,6 +657,7 @@ func (rlq *raftLogQueue) shouldQueueImpl( // processing the recomputation quickly, and not starving replicas which see // a significant amount of write traffic until they run over and truncate // more aggressively than they need to. + // NB: this happens even on followers. return true, true, 1.0 + float64(decision.Input.MaxLogSize)/2.0 } @@ -615,11 +714,16 @@ func (rlq *raftLogQueue) process( log.VEventf(ctx, 1, "%v", log.Safe(decision.String())) } b := &kv.Batch{} - b.AddRawRequest(&roachpb.TruncateLogRequest{ + truncRequest := &roachpb.TruncateLogRequest{ RequestHeader: roachpb.RequestHeader{Key: r.Desc().StartKey.AsRawKey()}, Index: decision.NewFirstIndex, RangeID: r.RangeID, - }) + } + if rlq.store.ClusterSettings().Version.IsActive( + ctx, clusterversion.LooselyCoupledRaftLogTruncation) { + truncRequest.ExpectedFirstIndex = decision.Input.FirstIndex + } + b.AddRawRequest(truncRequest) if err := rlq.db.Run(ctx, b); err != nil { return false, err } @@ -636,3 +740,14 @@ func (*raftLogQueue) timer(_ time.Duration) time.Duration { func (*raftLogQueue) purgatoryChan() <-chan time.Time { return nil } + +func isLooselyCoupledRaftLogTruncationEnabled( + ctx context.Context, settings *cluster.Settings, +) bool { + // TODO(sumeer): remove the false when hooking up the + // raftLogTruncator.durabilityAdvanced and fixing that method to do a + // durable read of RaftAppliedIndex. + return settings.Version.IsActive( + ctx, clusterversion.LooselyCoupledRaftLogTruncation) && + looselyCoupledTruncationEnabled.Get(&settings.SV) && false +} diff --git a/pkg/kv/kvserver/raft_log_truncator.go b/pkg/kv/kvserver/raft_log_truncator.go new file mode 100644 index 000000000000..0c3189098d23 --- /dev/null +++ b/pkg/kv/kvserver/raft_log_truncator.go @@ -0,0 +1,546 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// pendingLogTruncations tracks proposed truncations for a replica that have +// not yet been enacted due to the corresponding RaftAppliedIndex not yet +// being durable. It is a field in the Replica struct +// (Replica.pendingLogTruncations), but it is declared in this file since +// it is really part of the raftLogTruncator state that is per-replica. +// +// Note that we should not hold pendingLogTruncations.mu for long since it +// could block the raftLogQueue which needs to (only) read the pending +// truncations. This becomes tricky for the raftLogTruncator, which needs to +// do substantial work while reading these pending truncations, and then +// upgrade to mutating these truncations. It cannot allow a mutator to be +// running concurrently. We could add a second mutex to pendingLogTruncations +// to achieve this mutation mutual exclusion. However, we instead rely on the +// fact that all these mutation cases already hold Replica.raftMu (to prevent +// the Replica from being destroyed while the truncation work is happening). +// +// The summary is that we require Replica.raftMu to be additionally held while +// modifying the pending truncations. Hence, either one of those mutexes is +// sufficient for reading. This behavior is abstracted by the definition of +// replicaForTruncator below. +type pendingLogTruncations struct { + mu struct { + // From a lock ordering perspective, this mutex is the lowest, i.e., it + // should not be held when trying to acquire any other mutex. + syncutil.Mutex + // We track up to two truncations: the oldest pending truncation, and a + // merge of all the subsequent pending truncations. We cannot track only + // one merged truncation since its index may always be ahead of the + // durable RaftAppliedIndex, and so we may never be able to truncate. We + // assume liveness of durability advancement, which means that if no new + // pending truncations are added, the latest one will eventually be + // enacted. + // + // Note that this liveness assumption is not completely true -- if there are + // no writes happening to the store, the durability (due to memtable + // flushes) may not advance. We deem this (a) an uninteresting case, since + // if there are no writes we possibly don't care about aggressively + // truncating the log, (b) fixing the liveness assumption is not within + // scope of the truncator (it has to work with what it is given). + // + // Invariants: + // - Queue slot i is empty iff truncs[i] == pendingTruncation{} + // - Slot 0 represents the first position in the queue. Therefore, it is + // not possible for slot 0 to be empty and slot 1 to be non-empty. + // An implication is that the queue is empty iff slot 0 is empty. + // - If slot 0 and 1 are both non-empty, truncs[0].Index < truncs[1].Index + truncs [2]pendingTruncation + } +} + +// computePostTruncLogSize computes the size of the raft log under the +// pretense that the pending truncations have been enacted. +func (p *pendingLogTruncations) computePostTruncLogSize(raftLogSize int64) int64 { + p.mu.Lock() + defer p.mu.Unlock() + p.iterateLocked(func(_ int, trunc pendingTruncation) { + raftLogSize += trunc.logDeltaBytes + }) + if raftLogSize < 0 { + raftLogSize = 0 + } + return raftLogSize +} + +// computePostTruncFirstIndex computes the first log index that is not +// truncated, under the pretense that the pending truncations have been +// enacted. +func (p *pendingLogTruncations) computePostTruncFirstIndex(firstIndex uint64) uint64 { + p.mu.Lock() + defer p.mu.Unlock() + p.iterateLocked(func(_ int, trunc pendingTruncation) { + firstIndexAfterTrunc := trunc.firstIndexAfterTrunc() + if firstIndex < firstIndexAfterTrunc { + firstIndex = firstIndexAfterTrunc + } + }) + return firstIndex +} + +func (p *pendingLogTruncations) isEmptyLocked() bool { + return p.mu.truncs[0] == (pendingTruncation{}) +} + +// Returns the front (i.e. the least aggressive truncation) of the pending +// truncations queue, without removing the element. +// REQUIRES: !isEmptyLocked() +func (p *pendingLogTruncations) frontLocked() pendingTruncation { + return p.mu.truncs[0] +} + +// Pops the front (i.e. the least aggressive truncation) of the pending +// truncations queues. +// REQUIRES: !isEmptyLocked() +func (p *pendingLogTruncations) popLocked() { + p.mu.truncs[0] = p.mu.truncs[1] + p.mu.truncs[1] = pendingTruncation{} +} + +// Iterates over the queued truncations in the queue order, i.e., the oldest +// first. +func (p *pendingLogTruncations) iterateLocked(f func(index int, trunc pendingTruncation)) { + for i, trunc := range p.mu.truncs { + if !(trunc == (pendingTruncation{})) { + f(i, trunc) + } + } +} + +// Empties the queue of pending truncations. +func (p *pendingLogTruncations) reset() { + p.mu.Lock() + defer p.mu.Unlock() + for !p.isEmptyLocked() { + p.popLocked() + } +} + +func (p *pendingLogTruncations) capacity() int { + // Reminder: truncs is a fixed size array. + return len(p.mu.truncs) +} + +type pendingTruncation struct { + // The pending truncation will truncate entries up to + // RaftTruncatedState.Index, inclusive. + roachpb.RaftTruncatedState + + // The logDeltaBytes are computed under the assumption that the + // truncation is deleting [expectedFirstIndex,RaftTruncatedState.Index]. It + // originates in ReplicatedEvalResult, where it is accurate. + // There are two reasons isDeltaTrusted could be considered false here: + // - The original "accurate" delta does not account for sideloaded files. It + // is adjusted on this replica using + // SideloadStorage.BytesIfTruncatedFromTo, but it is possible that the + // truncated state of this replica is already > expectedFirstIndex. We + // don't actually set isDeltaTrusted=false for this case since we will + // change Replica.raftLogSizeTrusted to false after enacting this + // truncation. + // - We merge pendingTruncation entries in the pendingTruncations struct. We + // are making an effort to have consecutive TruncateLogRequests provide us + // stats for index intervals that are adjacent and non-overlapping, but + // that behavior is best-effort. + expectedFirstIndex uint64 + // logDeltaBytes includes the bytes from sideloaded files. Like + // ReplicatedEvalResult.RaftLogDelta, this is <= 0. + logDeltaBytes int64 + isDeltaTrusted bool +} + +func (pt *pendingTruncation) firstIndexAfterTrunc() uint64 { + // Reminder: RaftTruncatedState.Index is inclusive. + return pt.Index + 1 +} + +// raftLogTruncator is responsible for actually enacting truncations. +// +// Mutex ordering: The Replica mutexes can be held when acquiring +// raftLogTruncator.mu, but the reverse is not permitted. + +type raftLogTruncator struct { + store storeForTruncator + mu struct { + syncutil.Mutex + // Ranges are queued into addRanges and batch dequeued by swapping with + // drainRanges. This avoids holding mu for any work proportional to the + // number of queued ranges. + addRanges, drainRanges map[roachpb.RangeID]struct{} + } +} + +func makeRaftLogTruncator(store storeForTruncator) *raftLogTruncator { + t := &raftLogTruncator{ + store: store, + } + t.mu.addRanges = make(map[roachpb.RangeID]struct{}) + t.mu.drainRanges = make(map[roachpb.RangeID]struct{}) + return t +} + +// storeForTruncator abstracts the interface of Store needed by the truncator. +type storeForTruncator interface { + // acquireReplicaForTruncator ensures that the returned replicaForTruncator + // is not already destroyed. It may return nil. Any mutex protecting + // raft-state (e.g. Replica.raftMu) is acquired before returning. This + // method also ensures that the returned replica will not be destroyed until + // after releaseReplicaForTruncator is called. + acquireReplicaForTruncator(rangeID roachpb.RangeID) replicaForTruncator + // releaseReplicaForTruncator releases the replica. + releaseReplicaForTruncator(r replicaForTruncator) + // Engine accessor. + getEngine() storage.Engine +} + +// replicaForTruncator abstracts the interface of Replica needed by the +// truncator. +// +// A replica has raft state, including the queue of pending truncations, that +// the truncator is modifying. There is a "raft-state" mutex to mutually +// exclude other actions that are concurrently mutating this state. We assume +// that this "raft-state" mutex is held for the lifetime of +// replicaForTruncator. Hence there are no additional concurrency control +// requirements on the methods that read or write raft-state (this includes +// allowing pendingLogTruncations to be read without holding +// pendingLogTruncations.mu). +// +// We acknowledge that this interface may seem peculiar -- this is due to the +// constraint that it is abstracting Replica. +type replicaForTruncator interface { + // Returns the Range ID. + getRangeID() roachpb.RangeID + // Returns the current truncated state. + getTruncatedState() roachpb.RaftTruncatedState + // Updates the replica state after the truncation is enacted. + setTruncatedStateAndSideEffects( + _ context.Context, _ *roachpb.RaftTruncatedState, expectedFirstIndexPreTruncation uint64, + ) (expectedFirstIndexWasAccurate bool) + // Updates the stats related to the raft log size after the truncation is + // enacted. + setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) + // Returns the pending truncations queue. The caller is allowed to mutate + // the return value by additionally acquiring pendingLogTruncations.mu. + getPendingTruncs() *pendingLogTruncations + // Returns the sideloaded bytes that would be freed if we were to truncate + // [from, to). + sideloadedBytesIfTruncatedFromTo( + _ context.Context, from, to uint64) (freed int64, _ error) + getStateLoader() stateloader.StateLoader + // NB: Setting the persistent raft state is via the Engine exposed by + // storeForTruncator. +} + +// raftExpectedFirstIndex and raftLogDelta have the same meaning as in +// ReplicatedEvalResult. Never called before cluster is at +// LooselyCoupledRaftLogTruncation. +func (t *raftLogTruncator) addPendingTruncation( + ctx context.Context, + r replicaForTruncator, + trunc roachpb.RaftTruncatedState, + raftExpectedFirstIndex uint64, + raftLogDelta int64, +) { + pendingTrunc := pendingTruncation{ + RaftTruncatedState: trunc, + expectedFirstIndex: raftExpectedFirstIndex, + logDeltaBytes: raftLogDelta, + isDeltaTrusted: true, + } + pendingTruncs := r.getPendingTruncs() + // Need to figure out whether to add this new pendingTrunc to the + // truncations that are already queued, and if yes, where to add. + // i is the index of the last already queued truncation. + i := -1 + // alreadyTruncIndex represents what has been already truncated. + alreadyTruncIndex := r.getTruncatedState().Index + // No need to acquire pendingTruncs.mu for read in this case (see + // replicaForTruncator comment). + pendingTruncs.iterateLocked(func(index int, trunc pendingTruncation) { + i = index + if trunc.Index > alreadyTruncIndex { + alreadyTruncIndex = trunc.Index + } + }) + if alreadyTruncIndex >= pendingTrunc.Index { + // Noop. + return + } + // This new pending truncation will advance what is truncated. + // pos is where we will add the new pending truncation. + pos := i + 1 + mergeWithPending := false + if pos == pendingTruncs.capacity() { + // We need to merge with an existing pending truncation. + pos-- + mergeWithPending = true + } + // It is possible that alreadyTruncIndex+1 != raftExpectedFirstIndex. When + // we merge or enact we will see this problem and set the trusted bit to + // false. But we can at least correctly count sideloaded entries, which can + // be large, since we do the computation for the sideloaded entries size + // here. When alreadyTruncIndex+1 > raftExpectedFirstIndex, this will avoid + // double counting sideloaded entries that will be freed, and when + // alreadyTruncIndex+1 < raftExpectedFirstIndex, this will ensure that we + // don't miss sideloaded entries that will be freed. + // + // In the common case of alreadyTruncIndex+1 == raftExpectedFirstIndex, the + // computation returns the same result regardless of which is plugged in as + // the lower bound. + sideloadedFreed, err := r.sideloadedBytesIfTruncatedFromTo( + ctx, alreadyTruncIndex+1, pendingTrunc.firstIndexAfterTrunc()) + if err != nil { + // Log a loud error since we need to continue enqueuing the truncation. + log.Errorf(ctx, "while computing size of sideloaded files to truncate: %+v", err) + pendingTrunc.isDeltaTrusted = false + } + pendingTrunc.logDeltaBytes -= sideloadedFreed + if mergeWithPending { + // Merge the existing entry into the new one. + // No need to acquire pendingTruncs.mu for read in this case. + pendingTrunc.isDeltaTrusted = pendingTrunc.isDeltaTrusted && + pendingTruncs.mu.truncs[pos].isDeltaTrusted + if pendingTruncs.mu.truncs[pos].firstIndexAfterTrunc() != pendingTrunc.expectedFirstIndex { + pendingTrunc.isDeltaTrusted = false + } + pendingTrunc.logDeltaBytes += pendingTruncs.mu.truncs[pos].logDeltaBytes + pendingTrunc.expectedFirstIndex = pendingTruncs.mu.truncs[pos].expectedFirstIndex + } + pendingTruncs.mu.Lock() + // Install the new pending truncation. + pendingTruncs.mu.truncs[pos] = pendingTrunc + pendingTruncs.mu.Unlock() + + if pos == 0 { + if mergeWithPending { + panic("should never be merging pending truncations at pos 0") + } + // First entry in queue of pending truncations for this replica, so add + // the RangeID to the map. + t.enqueueRange(r.getRangeID()) + } +} + +type rangesByRangeID []roachpb.RangeID + +func (r rangesByRangeID) Len() int { + return len(r) +} +func (r rangesByRangeID) Less(i, j int) bool { + return r[i] < r[j] +} +func (r rangesByRangeID) Swap(i, j int) { + r[i], r[j] = r[j], r[i] +} + +// Invoked whenever the durability of the store advances. We assume that this +// is coarse in that the advancement of durability will apply to all ranges in +// this store, and most of the preceding pending truncations have their goal +// truncated index become durable in RangeAppliedState.RaftAppliedIndex. This +// coarseness assumption is important for not wasting much work being done in +// this method. +// TODO(sumeer): hook this up to the callback that will be invoked on the +// Store by the Engine (Pebble). Put this work on a separate goroutine of +// which there will be at most one running at a time. +func (t *raftLogTruncator) durabilityAdvanced(ctx context.Context) { + t.mu.Lock() + t.mu.addRanges, t.mu.drainRanges = t.mu.drainRanges, t.mu.addRanges + // If another pendingTruncation is added to this Replica, it will not be + // added to the addRanges map since the Replica already has pending + // truncations. That is ok: we will try to enact all pending truncations for + // that Replica below, since there typically will only be one pending, and + // if there are any remaining we will add it back to the addRanges map. + // + // We can modify drainRanges after releasing t.mu since we are guaranteed + // that there is at most one durabilityAdvanced running at a time. + drainRanges := t.mu.drainRanges + t.mu.Unlock() + if len(drainRanges) == 0 { + return + } + ranges := make([]roachpb.RangeID, 0, len(drainRanges)) + for k := range drainRanges { + ranges = append(ranges, k) + delete(drainRanges, k) + } + // Sort it for deterministic testing output. + sort.Sort(rangesByRangeID(ranges)) + // Create an engine Reader to provide a safe lower bound on what is durable. + // + // TODO(sumeer): This is incorrect -- change this reader to only read + // durable state after merging + // https://github.com/cockroachdb/pebble/pull/1490 and incorporating into + // CockroachDB. + reader := t.store.getEngine().NewReadOnly() + defer reader.Close() + for _, rangeID := range ranges { + t.tryEnactTruncations(ctx, rangeID, reader) + } +} + +// TODO(tbg): Instead of directly calling tryEnactTruncations from the +// raftLogTruncator, we would like to use the Store.processReady path to +// centralize error handling and timing of all raft related processing. We +// will continue to enact truncations across all replicas using a single +// goroutine per store, and not use the raftScheduler workers -- this is +// because a durabilityAdvanced callback triggers truncations for all queued +// replicas and we don't want to use up all the workers for truncation +// activity at the same time and starve foreground activity. We considered +// localizing all changes in handleRaftReadyRaftMuLocked (not touching the +// plumbing in processReady etc. that leads up to it), by marking the Replica +// that it should try doing truncation, and calling processReady from the +// truncator's goroutine. However, we are concerned that by marking the +// Replica we allow for a race between the the truncator's goroutine and the +// raftScheduler worker that can cause the latter to pick up the truncation +// work. This race is not a correctness problem, but can cause needless +// surprise. The current plan is to some refactoring of processReady so that +// we can have a second entry point (processTruncation) that also goes through +// most of the code that lives in processReady today (and passes a truncation +// through to handleRaftReadyRaftMuLocked). The code in +// handleRaftReadyRaftMyLocked would call something akin to +// tryEnactTruncations. Note that tryEnactTruncations needs a storage.Reader +// for reading only durable state -- currently we share it across replicas +// since it is easy to do so. But in the future code we can construct such a +// Reader in tryEnactTruncations. + +func (t *raftLogTruncator) tryEnactTruncations( + ctx context.Context, rangeID roachpb.RangeID, reader storage.Reader, +) { + r := t.store.acquireReplicaForTruncator(rangeID) + if r == nil { + // Not found. + return + } + defer t.store.releaseReplicaForTruncator(r) + truncState := r.getTruncatedState() + pendingTruncs := r.getPendingTruncs() + // Remove the noop pending truncations. + pendingTruncs.mu.Lock() + for !pendingTruncs.isEmptyLocked() { + pendingTrunc := pendingTruncs.frontLocked() + if pendingTrunc.Index <= truncState.Index { + // The pending truncation is a noop. Even though we avoid queueing + // noop truncations, this is possible because a snapshot could have + // been applied to the replica after enqueueing the truncations. + pendingTruncs.popLocked() + } else { + break + } + } + // NB: Unlocking but can keep reading pendingTruncs due to + // replicaForTruncator contract. + pendingTruncs.mu.Unlock() + if pendingTruncs.isEmptyLocked() { + // Nothing to do for this replica. + return + } + // Have some useful pending truncations. + + // Use the reader to decide what is durable. + stateLoader := r.getStateLoader() + as, err := stateLoader.LoadRangeAppliedState(ctx, reader) + if err != nil { + log.Errorf(ctx, "error loading RangeAppliedState, dropping all pending log truncations: %s", + err) + pendingTruncs.reset() + return + } + // enactIndex represents the index of the latest queued truncation that + // can be enacted. We start with -1 since it is possible that nothing can + // be enacted. + enactIndex := -1 + pendingTruncs.iterateLocked(func(index int, trunc pendingTruncation) { + if trunc.Index > as.RaftAppliedIndex { + return + } + enactIndex = index + }) + if enactIndex < 0 { + // Enqueue the rangeID for the future. + t.enqueueRange(rangeID) + return + } + // Do the truncation of persistent raft entries, specified by enactIndex + // (this subsumes all the preceding queued truncations). + batch := t.store.getEngine().NewUnindexedBatch(false /* writeOnly */) + defer batch.Close() + apply, err := handleTruncatedStateBelowRaftPreApply(ctx, &truncState, + &pendingTruncs.mu.truncs[enactIndex].RaftTruncatedState, stateLoader, batch) + if err != nil || !apply { + if err != nil { + log.Errorf(ctx, "while attempting to truncate raft log: %+v", err) + } else { + err := errors.AssertionFailedf( + "unexpected !apply from handleTruncatedStateBelowRaftPreApply") + if buildutil.CrdbTestBuild || util.RaceEnabled { + log.Fatalf(ctx, "%s", err) + } else { + log.Errorf(ctx, "%s", err) + } + } + pendingTruncs.reset() + return + } + // sync=false since we don't need a guarantee that the truncation is + // durable. Loss of a truncation means we have more of the suffix of the + // raft log, which does not affect correctness. + if err := batch.Commit(false /* sync */); err != nil { + log.Errorf(ctx, "while committing batch to truncate raft log: %+v", err) + pendingTruncs.reset() + return + } + // Truncation done. Need to update the Replica state. This requires iterating + // over all the enacted entries. + pendingTruncs.iterateLocked(func(index int, trunc pendingTruncation) { + if index > enactIndex { + return + } + isDeltaTrusted := true + expectedFirstIndexWasAccurate := r.setTruncatedStateAndSideEffects( + ctx, &trunc.RaftTruncatedState, trunc.expectedFirstIndex) + if !expectedFirstIndexWasAccurate || !trunc.isDeltaTrusted { + isDeltaTrusted = false + } + r.setTruncationDeltaAndTrusted(trunc.logDeltaBytes, isDeltaTrusted) + }) + // Now remove the enacted truncations. It is the same iteration as the + // previous one, but we do it while holding pendingTruncs.mu. + pendingTruncs.mu.Lock() + for i := 0; i <= enactIndex; i++ { + pendingTruncs.popLocked() + } + pendingTruncs.mu.Unlock() + if !pendingTruncs.isEmptyLocked() { + t.enqueueRange(rangeID) + } +} + +func (t *raftLogTruncator) enqueueRange(rangeID roachpb.RangeID) { + t.mu.Lock() + t.mu.addRanges[rangeID] = struct{}{} + t.mu.Unlock() +} diff --git a/pkg/kv/kvserver/raft_log_truncator_test.go b/pkg/kv/kvserver/raft_log_truncator_test.go new file mode 100644 index 000000000000..126da01307bf --- /dev/null +++ b/pkg/kv/kvserver/raft_log_truncator_test.go @@ -0,0 +1,381 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "fmt" + "math" + "sort" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestPendingLogTruncations(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Single threaded test. So nothing explicitly acquires truncs.mu. + truncs := pendingLogTruncations{} + // No pending truncation. + truncs.iterateLocked(func(index int, trunc pendingTruncation) { + require.Fail(t, "unexpected element") + }) + require.Equal(t, 2, truncs.capacity()) + require.True(t, truncs.isEmptyLocked()) + require.EqualValues(t, 55, truncs.computePostTruncLogSize(55)) + require.EqualValues(t, 5, truncs.computePostTruncFirstIndex(5)) + + // One pending truncation. + truncs.mu.truncs[0].logDeltaBytes = -50 + truncs.mu.truncs[0].Index = 20 + truncs.iterateLocked(func(index int, trunc pendingTruncation) { + require.Equal(t, 0, index) + require.Equal(t, truncs.mu.truncs[0], trunc) + }) + require.False(t, truncs.isEmptyLocked()) + require.Equal(t, truncs.mu.truncs[0], truncs.frontLocked()) + // Added -50. + require.EqualValues(t, 5, truncs.computePostTruncLogSize(55)) + // Added -50 and bumped up to 0. + require.EqualValues(t, 0, truncs.computePostTruncLogSize(45)) + // Advances to Index+1. + require.EqualValues(t, 21, truncs.computePostTruncFirstIndex(5)) + require.EqualValues(t, 21, truncs.computePostTruncFirstIndex(20)) + // Does not advance. + require.EqualValues(t, 21, truncs.computePostTruncFirstIndex(21)) + require.EqualValues(t, 30, truncs.computePostTruncFirstIndex(30)) + + // Two pending truncations. + truncs.mu.truncs[1].logDeltaBytes = -70 + truncs.mu.truncs[1].Index = 30 + indexes := []int(nil) + truncs.iterateLocked(func(index int, trunc pendingTruncation) { + require.Greater(t, truncs.capacity(), index) + require.Equal(t, truncs.mu.truncs[index], trunc) + indexes = append(indexes, index) + }) + require.Equal(t, []int{0, 1}, indexes) + require.False(t, truncs.isEmptyLocked()) + require.Equal(t, truncs.mu.truncs[0], truncs.frontLocked()) + // Added -120. + require.EqualValues(t, 5, truncs.computePostTruncLogSize(125)) + // Added -120 and bumped up to 0. + require.EqualValues(t, 0, truncs.computePostTruncLogSize(115)) + // Advances to Index+1 of second entry. + require.EqualValues(t, 31, truncs.computePostTruncFirstIndex(5)) + require.EqualValues(t, 31, truncs.computePostTruncFirstIndex(30)) + // Does not advance. + require.EqualValues(t, 31, truncs.computePostTruncFirstIndex(31)) + require.EqualValues(t, 40, truncs.computePostTruncFirstIndex(40)) + + // Pop first. + last := truncs.mu.truncs[1] + truncs.popLocked() + truncs.iterateLocked(func(index int, trunc pendingTruncation) { + require.Equal(t, 0, index) + require.Equal(t, last, trunc) + }) + require.False(t, truncs.isEmptyLocked()) + require.Equal(t, last, truncs.frontLocked()) + // Pop last. + truncs.popLocked() + require.True(t, truncs.isEmptyLocked()) + truncs.iterateLocked(func(index int, trunc pendingTruncation) { + require.Fail(t, "unexpected element") + }) +} + +type replicaTruncatorTest struct { + rangeID roachpb.RangeID + buf *strings.Builder + stateLoader stateloader.StateLoader + truncState roachpb.RaftTruncatedState + pendingTruncs pendingLogTruncations + sideloadedFreed int64 + sideloadedErr error +} + +var _ replicaForTruncator = &replicaTruncatorTest{} + +func makeReplicaTT(rangeID roachpb.RangeID, buf *strings.Builder) *replicaTruncatorTest { + return &replicaTruncatorTest{ + rangeID: rangeID, + buf: buf, + stateLoader: stateloader.Make(rangeID), + } +} + +func (r *replicaTruncatorTest) getRangeID() roachpb.RangeID { + return r.rangeID +} + +func (r *replicaTruncatorTest) getTruncatedState() roachpb.RaftTruncatedState { + fmt.Fprintf(r.buf, "r%d.getTruncatedState\n", r.rangeID) + return r.truncState +} + +func (r *replicaTruncatorTest) getPendingTruncs() *pendingLogTruncations { + fmt.Fprintf(r.buf, "r%d.getPendingTruncs\n", r.rangeID) + return &r.pendingTruncs +} + +func (r *replicaTruncatorTest) setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) { + fmt.Fprintf(r.buf, "r%d.setTruncationDeltaAndTrusted(delta:%d, trusted:%t)\n", + r.rangeID, deltaBytes, isDeltaTrusted) +} + +func (r *replicaTruncatorTest) sideloadedBytesIfTruncatedFromTo( + _ context.Context, from, to uint64, +) (freed int64, _ error) { + fmt.Fprintf(r.buf, "r%d.sideloadedBytesIfTruncatedFromTo(%d, %d)\n", r.rangeID, from, to) + return r.sideloadedFreed, r.sideloadedErr +} + +func (r *replicaTruncatorTest) getStateLoader() stateloader.StateLoader { + fmt.Fprintf(r.buf, "r%d.getStateLoader\n", r.rangeID) + return r.stateLoader +} + +func (r *replicaTruncatorTest) setTruncatedStateAndSideEffects( + _ context.Context, truncState *roachpb.RaftTruncatedState, expectedFirstIndexPreTruncation uint64, +) (expectedFirstIndexWasAccurate bool) { + expectedFirstIndexWasAccurate = r.truncState.Index+1 == expectedFirstIndexPreTruncation + r.truncState = *truncState + fmt.Fprintf(r.buf, + "r%d.setTruncatedStateAndSideEffects(..., expectedFirstIndex:%d) => trusted:%t\n", + r.rangeID, expectedFirstIndexPreTruncation, expectedFirstIndexWasAccurate) + return expectedFirstIndexWasAccurate +} + +func (r *replicaTruncatorTest) writeRaftStateToEngine( + t *testing.T, eng storage.Engine, truncIndex uint64, lastLogEntry uint64, +) { + require.NoError(t, r.stateLoader.SetRaftTruncatedState(context.Background(), eng, + &roachpb.RaftTruncatedState{Index: truncIndex})) + for i := truncIndex + 1; i < lastLogEntry; i++ { + require.NoError(t, eng.PutUnversioned(r.stateLoader.RaftLogKey(i), []byte("something"))) + } +} + +func (r *replicaTruncatorTest) writeRaftAppliedIndex( + t *testing.T, eng storage.Engine, raftAppliedIndex uint64, +) { + require.NoError(t, r.stateLoader.SetRangeAppliedState(context.Background(), eng, + raftAppliedIndex, 0, 0, &enginepb.MVCCStats{}, nil)) + // Flush to make it satisfy the contract of OnlyReadGuaranteedDurable in + // Pebble. + // TODO(sumeer): by controlling the size of the memtable we can probably + // construct a deterministic test where a flush does not happen, and we can + // test that raftLogTruncator is actually reading only the durable + // RaftAppliedIndex. + require.NoError(t, eng.Flush()) +} + +func (r *replicaTruncatorTest) printEngine(t *testing.T, eng storage.Engine) { + truncState, err := r.stateLoader.LoadRaftTruncatedState(context.Background(), eng) + require.NoError(t, err) + fmt.Fprintf(r.buf, "truncated index: %d\n", truncState.Index) + prefix := r.stateLoader.RaftLogPrefix() + iter := eng.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + UpperBound: r.stateLoader.RaftLogKey(math.MaxUint64), + }) + defer iter.Close() + iter.SeekGE(storage.MVCCKey{Key: r.stateLoader.RaftLogKey(0)}) + valid, err := iter.Valid() + require.NoError(t, err) + fmt.Fprintf(r.buf, "log entries:") + printPrefixStr := "" + for valid { + key := iter.Key() + _, index, err := encoding.DecodeUint64Ascending(key.Key[len(prefix):]) + require.NoError(t, err) + fmt.Fprintf(r.buf, "%s %d", printPrefixStr, index) + printPrefixStr = "," + iter.Next() + valid, err = iter.Valid() + require.NoError(t, err) + } + fmt.Fprintf(r.buf, "\n") + // It is ok to pretend that a regular read is equivalent to + // OnlyReadGuaranteedDurable for printing in this test, since we flush in + // the code above whenever writing RaftAppliedIndex. + as, err := r.stateLoader.LoadRangeAppliedState(context.Background(), eng) + require.NoError(t, err) + fmt.Fprintf(r.buf, "durable applied index: %d\n", as.RaftAppliedIndex) +} + +func (r *replicaTruncatorTest) printReplicaState() { + r.pendingTruncs.mu.Lock() + defer r.pendingTruncs.mu.Unlock() + fmt.Fprintf(r.buf, "truncIndex: %d\npending:\n", r.truncState.Index) + r.pendingTruncs.iterateLocked(func(index int, trunc pendingTruncation) { + fmt.Fprintf(r.buf, " %+v\n", trunc) + }) +} + +type storeTruncatorTest struct { + eng storage.Engine + buf *strings.Builder + replicas map[roachpb.RangeID]*replicaTruncatorTest +} + +var _ storeForTruncator = &storeTruncatorTest{} + +func makeStoreTT(eng storage.Engine, buf *strings.Builder) *storeTruncatorTest { + return &storeTruncatorTest{ + eng: eng, + buf: buf, + replicas: make(map[roachpb.RangeID]*replicaTruncatorTest), + } +} + +func (s *storeTruncatorTest) getEngine() storage.Engine { + return s.eng +} + +func (s *storeTruncatorTest) acquireReplicaForTruncator( + rangeID roachpb.RangeID, +) replicaForTruncator { + fmt.Fprintf(s.buf, "acquireReplica(%d)\n", rangeID) + rv := s.replicas[rangeID] + if rv == nil { + // Return nil and not an interface holding nil. + return nil + } + return rv +} + +func (s *storeTruncatorTest) releaseReplicaForTruncator(r replicaForTruncator) { + fmt.Fprintf(s.buf, "releaseReplica(%d)\n", r.(*replicaTruncatorTest).rangeID) +} + +func TestRaftLogTruncator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var buf strings.Builder + flushAndReset := func() string { + str := buf.String() + buf.Reset() + return str + } + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + store := makeStoreTT(eng, &buf) + truncator := makeRaftLogTruncator(store) + + datadriven.RunTest(t, testutils.TestDataPath(t, "raft_log_truncator"), + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "create-replica": + rangeID := scanRangeID(t, d) + var truncIndex uint64 + d.ScanArgs(t, "trunc-index", &truncIndex) + var lastLogEntry uint64 + d.ScanArgs(t, "last-log-entry", &lastLogEntry) + r := makeReplicaTT(rangeID, &buf) + r.truncState.Index = truncIndex + r.writeRaftStateToEngine(t, eng, truncIndex, lastLogEntry) + store.replicas[rangeID] = r + return flushAndReset() + + case "print-engine-state": + store.replicas[scanRangeID(t, d)].printEngine(t, eng) + return flushAndReset() + + case "add-pending-truncation": + rangeID := scanRangeID(t, d) + var firstIndex, truncIndex uint64 + d.ScanArgs(t, "first-index", &firstIndex) + d.ScanArgs(t, "trunc-index", &truncIndex) + var deltaBytes, sideloadedBytes int + d.ScanArgs(t, "delta-bytes", &deltaBytes) + d.ScanArgs(t, "sideloaded-bytes", &sideloadedBytes) + r := store.replicas[rangeID] + if d.HasArg("sideloaded-err") { + var sideloadedErr bool + d.ScanArgs(t, "sideloaded-err", &sideloadedErr) + if sideloadedErr { + r.sideloadedErr = errors.Errorf("side-loaded err") + } + } + r.sideloadedFreed = int64(sideloadedBytes) + truncator.addPendingTruncation(context.Background(), r, + roachpb.RaftTruncatedState{Index: truncIndex}, firstIndex, int64(deltaBytes)) + printTruncatorState(t, &buf, truncator) + r.sideloadedErr = nil + return flushAndReset() + + case "print-replica-state": + store.replicas[scanRangeID(t, d)].printReplicaState() + return flushAndReset() + + case "write-raft-applied-index": + rangeID := scanRangeID(t, d) + var raftAppliedIndex uint64 + d.ScanArgs(t, "raft-applied-index", &raftAppliedIndex) + store.replicas[rangeID].writeRaftAppliedIndex(t, eng, raftAppliedIndex) + return flushAndReset() + + case "add-replica-to-truncator": + // In addition to replicas being added to the truncator via + // add-pending-truncation, we can manually add them to test the + // replica not found etc. paths. + truncator.enqueueRange(scanRangeID(t, d)) + printTruncatorState(t, &buf, truncator) + return flushAndReset() + + case "durability-advanced": + truncator.durabilityAdvanced(context.Background()) + printTruncatorState(t, &buf, truncator) + return flushAndReset() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} + +func scanRangeID(t *testing.T, d *datadriven.TestData) roachpb.RangeID { + var id int + d.ScanArgs(t, "id", &id) + return roachpb.RangeID(id) +} + +func printTruncatorState(t *testing.T, buf *strings.Builder, truncator *raftLogTruncator) { + truncator.mu.Lock() + defer truncator.mu.Unlock() + require.Zero(t, len(truncator.mu.drainRanges)) + ranges := make([]roachpb.RangeID, 0, len(truncator.mu.addRanges)) + for id := range truncator.mu.addRanges { + ranges = append(ranges, id) + } + sort.Slice(ranges, func(i, j int) bool { return ranges[i] < ranges[j] }) + fmt.Fprintf(buf, "truncator ranges:") + prefixStr := " " + for _, id := range ranges { + fmt.Fprintf(buf, "%s%d", prefixStr, id) + prefixStr = ", " + } +} diff --git a/pkg/kv/kvserver/raft_truncator_replica.go b/pkg/kv/kvserver/raft_truncator_replica.go new file mode 100644 index 000000000000..2547a533f277 --- /dev/null +++ b/pkg/kv/kvserver/raft_truncator_replica.go @@ -0,0 +1,78 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// Implementation of the replicaForTruncator interface. +type raftTruncatorReplica Replica + +var _ replicaForTruncator = &raftTruncatorReplica{} + +func (r *raftTruncatorReplica) getRangeID() roachpb.RangeID { + return r.RangeID +} + +func (r *raftTruncatorReplica) getTruncatedState() roachpb.RaftTruncatedState { + r.mu.Lock() + defer r.mu.Unlock() + // TruncatedState is guaranteed to be non-nil. + return *r.mu.state.TruncatedState +} + +func (r *raftTruncatorReplica) setTruncatedStateAndSideEffects( + ctx context.Context, trunc *roachpb.RaftTruncatedState, expectedFirstIndexPreTruncation uint64, +) (expectedFirstIndexWasAccurate bool) { + _, expectedFirstIndexAccurate := (*Replica)(r).handleTruncatedStateResult( + ctx, trunc, expectedFirstIndexPreTruncation) + return expectedFirstIndexAccurate +} + +func (r *raftTruncatorReplica) setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.raftLogSize += deltaBytes + r.mu.raftLogLastCheckSize += deltaBytes + // Ensure raftLog{,LastCheck}Size is not negative since it isn't persisted + // between server restarts. + if r.mu.raftLogSize < 0 { + r.mu.raftLogSize = 0 + } + if r.mu.raftLogLastCheckSize < 0 { + r.mu.raftLogLastCheckSize = 0 + } + if !isDeltaTrusted { + r.mu.raftLogSizeTrusted = false + } +} + +func (r *raftTruncatorReplica) getPendingTruncs() *pendingLogTruncations { + return &r.pendingLogTruncations +} + +func (r *raftTruncatorReplica) sideloadedBytesIfTruncatedFromTo( + ctx context.Context, from, to uint64, +) (freed int64, err error) { + freed, _, err = r.raftMu.sideloaded.BytesIfTruncatedFromTo(ctx, from, to) + return freed, err +} + +func (r *raftTruncatorReplica) getStateLoader() stateloader.StateLoader { + // NB: the replicaForTruncator contract says that Replica.raftMu is held for + // the duration of the existence of replicaForTruncator, so we return the + // r.raftMu.stateloader (and not r.mu.stateLoader). + return r.raftMu.stateLoader +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index d0ddb55950af..65a593eabf67 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -444,6 +444,12 @@ type Replica struct { // until the first truncation is carried out), but it prevents a large // dormant Raft log from sitting around forever, which has caused problems // in the past. + // + // Note that both raftLogSize and raftLogSizeTrusted do not include the + // effect of pending log truncations (see Replica.pendingLogTruncations). + // Hence, they are fine for metrics etc., but not for deciding whether we + // should create another pending truncation. For the latter, we compute + // the post-pending-truncation size using pendingLogTruncations. raftLogSize int64 // If raftLogSizeTrusted is false, don't trust the above raftLogSize until // it has been recomputed. @@ -635,6 +641,12 @@ type Replica struct { closedTimestampSetter closedTimestampSetterInfo } + // The raft log truncations that are pending. Access is protected by its own + // mutex. All implementation details should be considered hidden except to + // the code in raft_log_truncator.go. External code should only use the + // computePostTrunc* methods. + pendingLogTruncations pendingLogTruncations + rangefeedMu struct { syncutil.RWMutex // proc is an instance of a rangefeed Processor that is capable of diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index ee6496d99f41..7a02007b0362 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -281,9 +281,11 @@ func (r *Replica) handleLeaseResult( } func (r *Replica) handleTruncatedStateResult( - ctx context.Context, t *roachpb.RaftTruncatedState, -) (raftLogDelta int64) { + ctx context.Context, t *roachpb.RaftTruncatedState, expectedFirstIndexPreTruncation uint64, +) (raftLogDelta int64, expectedFirstIndexWasAccurate bool) { r.mu.Lock() + expectedFirstIndexWasAccurate = + r.mu.state.TruncatedState.Index+1 == expectedFirstIndexPreTruncation r.mu.state.TruncatedState = t r.mu.Unlock() @@ -294,6 +296,9 @@ func (r *Replica) handleTruncatedStateResult( // Truncate the sideloaded storage. Note that this is safe only if the new truncated state // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately // could rot. + // TODO(sumeer): once we remove the legacy caller of + // handleTruncatedStateResult, stop calculating the size of the removed + // files and the remaining files. log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", t.Index) size, _, err := r.raftMu.sideloaded.TruncateTo(ctx, t.Index+1) if err != nil { @@ -301,7 +306,7 @@ func (r *Replica) handleTruncatedStateResult( // loud error, but keep humming along. log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) } - return -size + return -size, expectedFirstIndexWasAccurate } func (r *Replica) handleGCThresholdResult(ctx context.Context, thresh *hlc.Timestamp) { @@ -368,17 +373,7 @@ func (r *Replica) handleChangeReplicasResult( return true } -func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64) { - r.mu.Lock() - defer r.mu.Unlock() - r.mu.raftLogSize += delta - r.mu.raftLogLastCheckSize += delta - // Ensure raftLog{,LastCheck}Size is not negative since it isn't persisted - // between server restarts. - if r.mu.raftLogSize < 0 { - r.mu.raftLogSize = 0 - } - if r.mu.raftLogLastCheckSize < 0 { - r.mu.raftLogLastCheckSize = 0 - } +// TODO(sumeer): remove method when all truncation is loosely coupled. +func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64, isDeltaTrusted bool) { + (*raftTruncatorReplica)(r).setTruncationDeltaAndTrusted(delta, isDeltaTrusted) } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 01583437cad1..833aa72b8c5a 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -749,26 +749,57 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( } if res.State != nil && res.State.TruncatedState != nil { - if apply, err := handleTruncatedStateBelowRaftPreApply( - ctx, b.state.TruncatedState, res.State.TruncatedState, b.r.raftMu.stateLoader, b.batch, - ); err != nil { - return wrapWithNonDeterministicFailure(err, "unable to handle truncated state") - } else if !apply { - // The truncated state was discarded, so make sure we don't apply - // it to our in-memory state. + var err error + // Typically one should not be checking the cluster version below raft, + // since it can cause state machine divergence. However, this check is + // only for deciding how to truncate the raft log, which is not part of + // the state machine. Also, we will eventually eliminate this check by + // only supporting loosely coupled truncation. + looselyCoupledTruncation := isLooselyCoupledRaftLogTruncationEnabled(ctx, b.r.ClusterSettings()) + // In addition to cluster version and cluster settings, we also apply + // immediately if RaftExpectedFirstIndex is not populated (see comment in + // that proto). + // + // In the release following LooselyCoupledRaftLogTruncation, we will + // retire the strongly coupled path. It is possible that some replica + // still has a truncation sitting in a raft log that never populated + // RaftExpectedFirstIndex, which will be interpreted as 0. When applying + // it, the loosely coupled code will mark the log size as untrusted and + // will recompute the size. This has no correctness impact, so we are not + // going to bother with a long-running migration. + apply := !looselyCoupledTruncation || res.RaftExpectedFirstIndex == 0 + if apply { + if apply, err = handleTruncatedStateBelowRaftPreApply( + ctx, b.state.TruncatedState, res.State.TruncatedState, b.r.raftMu.stateLoader, b.batch, + ); err != nil { + return wrapWithNonDeterministicFailure(err, "unable to handle truncated state") + } + } else { + b.r.store.raftTruncator.addPendingTruncation( + ctx, (*raftTruncatorReplica)(b.r), *res.State.TruncatedState, res.RaftExpectedFirstIndex, + res.RaftLogDelta) + } + if !apply { + // The truncated state was discarded, or we are queuing a pending + // truncation, so make sure we don't apply it to our in-memory state. res.State.TruncatedState = nil res.RaftLogDelta = 0 - // TODO(ajwerner): consider moving this code. - // We received a truncation that doesn't apply to us, so we know that - // there's a leaseholder out there with a log that has earlier entries - // than ours. That leader also guided our log size computations by - // giving us RaftLogDeltas for past truncations, and this was likely - // off. Mark our Raft log size is not trustworthy so that, assuming - // we step up as leader at some point in the future, we recompute - // our numbers. - b.r.mu.Lock() - b.r.mu.raftLogSizeTrusted = false - b.r.mu.Unlock() + res.RaftExpectedFirstIndex = 0 + if !looselyCoupledTruncation { + // TODO(ajwerner): consider moving this code. + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + // TODO(sumeer): this code will be deleted when there is no + // !looselyCoupledTruncation code path. + b.r.mu.Lock() + b.r.mu.raftLogSizeTrusted = false + b.r.mu.Unlock() + } } } @@ -1227,6 +1258,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult") } + isRaftLogTruncationDeltaTrusted := true if rResult.State != nil { if newLease := rResult.State.Lease; newLease != nil { sm.r.handleLeaseResult(ctx, newLease, rResult.PriorReadSummary) @@ -1234,9 +1266,17 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( rResult.PriorReadSummary = nil } + // This strongly coupled truncation code will be removed in the release + // following LooselyCoupledRaftLogTruncation. if newTruncState := rResult.State.TruncatedState; newTruncState != nil { - rResult.RaftLogDelta += sm.r.handleTruncatedStateResult(ctx, newTruncState) + raftLogDelta, expectedFirstIndexWasAccurate := sm.r.handleTruncatedStateResult( + ctx, newTruncState, rResult.RaftExpectedFirstIndex) + if !expectedFirstIndexWasAccurate && rResult.RaftExpectedFirstIndex != 0 { + isRaftLogTruncationDeltaTrusted = false + } + rResult.RaftLogDelta += raftLogDelta rResult.State.TruncatedState = nil + rResult.RaftExpectedFirstIndex = 0 } if newThresh := rResult.State.GCThreshold; newThresh != nil { @@ -1254,7 +1294,10 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( } if rResult.RaftLogDelta != 0 { - sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta) + // This code path will be taken exactly when the preceding block has + // newTruncState != nil. It is needlessly confusing that these two are not + // in the same place. + sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta, isRaftLogTruncationDeltaTrusted) rResult.RaftLogDelta = 0 } diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index 3bc612bd6825..c949daa90353 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -146,3 +146,97 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { } }) } + +// TODO(sumeer): when isLooselyCoupledRaftLogTruncationEnabled can return +// true, add a test that queues up a pending truncation and then cause it to +// get enacted by calling raftLogTruncator.durabilityAdvanced. + +func TestReplicaStateMachineRaftLogTruncation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testutils.RunTrueAndFalse(t, "accurate first index", func(t *testing.T, accurate bool) { + tc := testContext{} + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + + // Lock the replica for the entire test. + r := tc.repl + r.raftMu.Lock() + defer r.raftMu.Unlock() + sm := r.getStateMachine() + + // Create a new application batch. + b := sm.NewBatch(false /* ephemeral */).(*replicaAppBatch) + defer b.Close() + + r.mu.Lock() + raftAppliedIndex := r.mu.state.RaftAppliedIndex + truncatedIndex := r.mu.state.TruncatedState.Index + raftLogSize := r.mu.raftLogSize + // Overwrite to be trusted, since we want to check if transitions to false + // or not. + r.mu.raftLogSizeTrusted = true + r.mu.Unlock() + + expectedFirstIndex := truncatedIndex + 1 + if !accurate { + expectedFirstIndex = truncatedIndex + } + // Stage a command that truncates one raft log entry which we pretend has a + // byte size of 1. + cmd := &replicatedCmd{ + ctx: ctx, + ent: &raftpb.Entry{ + Index: raftAppliedIndex + 1, + Type: raftpb.EntryNormal, + }, + decodedRaftEntry: decodedRaftEntry{ + idKey: makeIDKey(), + raftCmd: kvserverpb.RaftCommand{ + ProposerLeaseSequence: r.mu.state.Lease.Sequence, + MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1, + ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ + State: &kvserverpb.ReplicaState{ + TruncatedState: &roachpb.RaftTruncatedState{ + Index: truncatedIndex + 1, + }, + }, + RaftLogDelta: -1, + RaftExpectedFirstIndex: expectedFirstIndex, + WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0), + }, + }, + }, + } + + checkedCmd, err := b.Stage(cmd.ctx, cmd) + require.NoError(t, err) + + // Apply the batch to the StateMachine. + err = b.ApplyToStateMachine(ctx) + require.NoError(t, err) + + // Apply the side effects of the command to the StateMachine. + _, err = sm.ApplySideEffects(checkedCmd.Ctx(), checkedCmd) + require.NoError(t, err) + func() { + r.mu.Lock() + defer r.mu.Unlock() + require.Equal(t, raftAppliedIndex+1, r.mu.state.RaftAppliedIndex) + require.Equal(t, truncatedIndex+1, r.mu.state.TruncatedState.Index) + expectedSize := raftLogSize - 1 + // We typically have a raftLogSize > 0 (based on inspecting some test + // runs), but we can't be sure. + if expectedSize < 0 { + expectedSize = 0 + } + require.Equal(t, expectedSize, r.mu.raftLogSize) + require.Equal(t, accurate, r.mu.raftLogSizeTrusted) + truncState, err := r.mu.stateLoader.LoadRaftTruncatedState(context.Background(), tc.engine) + require.NoError(t, err) + require.Equal(t, r.mu.state.TruncatedState.Index, truncState.Index) + }() + }) +} diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 7844ae8f79f9..7f0e8e63537c 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -280,7 +280,10 @@ func (r *Replica) needsRaftLogTruncationLocked() bool { // operation or even every operation which occurs after the Raft log exceeds // RaftLogQueueStaleSize. The logic below queues the replica for possible // Raft log truncation whenever an additional RaftLogQueueStaleSize bytes - // have been written to the Raft log. + // have been written to the Raft log. Note that it does not matter if some + // of the bytes in raftLogLastCheckSize are already part of pending + // truncations since this comparison is looking at whether the raft log has + // grown sufficiently. checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize if checkRaftLog { r.mu.raftLogLastCheckSize = r.mu.raftLogSize diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1cb46178ec4b..68cf20533c0b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -2025,9 +2025,9 @@ func ComputeRaftLogSize( var totalSideloaded int64 if sideloaded != nil { var err error - // Truncating all indexes strictly smaller than zero is a no-op but - // gives us the number of bytes in the storage back. - _, totalSideloaded, err = sideloaded.TruncateTo(ctx, 0) + // The remaining bytes if one were to truncate [0, 0) gives us the total + // number of bytes in sideloaded files. + _, totalSideloaded, err = sideloaded.BytesIfTruncatedFromTo(ctx, 0, 0) if err != nil { return 0, err } diff --git a/pkg/kv/kvserver/replica_sideload.go b/pkg/kv/kvserver/replica_sideload.go index e9b1d25bd406..ce5150d65b22 100644 --- a/pkg/kv/kvserver/replica_sideload.go +++ b/pkg/kv/kvserver/replica_sideload.go @@ -50,6 +50,10 @@ type SideloadStorage interface { // the given one. Returns the number of bytes freed, the number of bytes in // files that remain, or an error. TruncateTo(_ context.Context, index uint64) (freed, retained int64, _ error) + // BytesIfTruncatedFromTo returns the number of bytes that would be freed, + // if one were to truncate [from, to). Additionally, it returns the the + // number of bytes that would be retained >= to. + BytesIfTruncatedFromTo(_ context.Context, from, to uint64) (freed, retained int64, _ error) // Returns an absolute path to the file that Get() would return the contents // of. Does not check whether the file actually exists. Filename(_ context.Context, index, term uint64) (string, error) diff --git a/pkg/kv/kvserver/replica_sideload_disk.go b/pkg/kv/kvserver/replica_sideload_disk.go index 9298c0f2469e..bcc8b9a2f303 100644 --- a/pkg/kv/kvserver/replica_sideload_disk.go +++ b/pkg/kv/kvserver/replica_sideload_disk.go @@ -212,10 +212,17 @@ func (ss *diskSideloadStorage) Clear(_ context.Context) error { // TruncateTo implements SideloadStorage. func (ss *diskSideloadStorage) TruncateTo( ctx context.Context, firstIndex uint64, +) (bytesFreed, bytesRetained int64, _ error) { + return ss.possiblyTruncateTo(ctx, 0, firstIndex, true /* doTruncate */) +} + +// Helper for truncation or byte calculation for [from, to). +func (ss *diskSideloadStorage) possiblyTruncateTo( + ctx context.Context, from uint64, to uint64, doTruncate bool, ) (bytesFreed, bytesRetained int64, _ error) { deletedAll := true if err := ss.forEach(ctx, func(index uint64, filename string) error { - if index >= firstIndex { + if index >= to { size, err := ss.fileSize(filename) if err != nil { return err @@ -224,7 +231,17 @@ func (ss *diskSideloadStorage) TruncateTo( deletedAll = false return nil } - fileSize, err := ss.purgeFile(ctx, filename) + if index < from { + return nil + } + // index is in [from, to) + var fileSize int64 + var err error + if doTruncate { + fileSize, err = ss.purgeFile(ctx, filename) + } else { + fileSize, err = ss.fileSize(filename) + } if err != nil { return err } @@ -234,7 +251,7 @@ func (ss *diskSideloadStorage) TruncateTo( return 0, 0, err } - if deletedAll { + if deletedAll && doTruncate { // The directory may not exist, or it may exist and have been empty. // Not worth trying to figure out which one, just try to delete. err := ss.eng.Remove(ss.dir) @@ -246,6 +263,13 @@ func (ss *diskSideloadStorage) TruncateTo( return bytesFreed, bytesRetained, nil } +// BytesIfTruncatedFromTo implements SideloadStorage. +func (ss *diskSideloadStorage) BytesIfTruncatedFromTo( + ctx context.Context, from uint64, to uint64, +) (freed, retained int64, _ error) { + return ss.possiblyTruncateTo(ctx, from, to, false /* doTruncate */) +} + func (ss *diskSideloadStorage) forEach( ctx context.Context, visit func(index uint64, filename string) error, ) error { diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index b6f3801f6f36..9651d312ef5c 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -242,10 +242,20 @@ func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) { assertCreated(true) for n := range payloads { - // Truncate indexes <= payloads[n] (payloads is sorted in increasing order). - if _, _, err := ss.TruncateTo(ctx, payloads[n]); err != nil { + freed, retained, err := ss.BytesIfTruncatedFromTo(ctx, 0, payloads[n]) + require.NoError(t, err) + freedWhatWasRetained, retainedNothing, err := + ss.BytesIfTruncatedFromTo(ctx, payloads[n], math.MaxUint64) + require.NoError(t, err) + require.Zero(t, retainedNothing) + require.Equal(t, freedWhatWasRetained, retained) + // Truncate indexes < payloads[n] (payloads is sorted in increasing order). + freedByTruncateTo, retainedByTruncateTo, err := ss.TruncateTo(ctx, payloads[n]) + if err != nil { t.Fatalf("%d: %+v", n, err) } + require.Equal(t, freedByTruncateTo, freed) + require.Equal(t, retainedByTruncateTo, retained) // Index payloads[n] and above are still there (truncation is exclusive) // at both terms. for _, term := range []uint64{lowTerm, highTerm} { @@ -302,8 +312,13 @@ func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) { require.NoError(t, ss.Put(ctx, i, highTerm, file(i*highTerm))) } assertCreated(true) - _, _, err = ss.TruncateTo(ctx, math.MaxUint64) + freed, retained, err := ss.BytesIfTruncatedFromTo(ctx, 0, math.MaxUint64) + require.NoError(t, err) + require.Zero(t, retained) + freedByTruncateTo, retainedByTruncateTo, err := ss.TruncateTo(ctx, math.MaxUint64) require.NoError(t, err) + require.Zero(t, retainedByTruncateTo) + require.Equal(t, freedByTruncateTo, freed) // Ensure directory is removed when all records are removed. _, err = eng.Stat(ss.Dir()) require.True(t, oserror.IsNotExist(err), "%v", err) @@ -313,9 +328,16 @@ func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) { assertCreated(false) - // Sanity check that we can call TruncateTo without the directory existing. - _, _, err := ss.TruncateTo(ctx, 1) + // Sanity check that we can call BytesIfTruncatedFromTo and TruncateTo + // without the directory existing. + freed, retained, err := ss.BytesIfTruncatedFromTo(ctx, 0, 1) + require.NoError(t, err) + require.Zero(t, freed) + require.Zero(t, retained) + freed, retained, err = ss.TruncateTo(ctx, 1) require.NoError(t, err) + require.Zero(t, freed) + require.Zero(t, retained) assertCreated(false) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 757eef1373a3..b5cfabef3603 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -701,21 +701,24 @@ increasing over time (see Replica.setTombstoneKey). NOTE: to the best of our knowledge, we don't rely on this invariant. */ type Store struct { - Ident *roachpb.StoreIdent // pointer to catch access before Start() is called - cfg StoreConfig - db *kv.DB - engine storage.Engine // The underlying key-value store - tsCache tscache.Cache // Most recent timestamps for keys / key ranges - allocator Allocator // Makes allocation decisions - replRankings *replicaRankings - storeRebalancer *StoreRebalancer - rangeIDAlloc *idalloc.Allocator // Range ID allocator - mvccGCQueue *mvccGCQueue // MVCC GC queue - mergeQueue *mergeQueue // Range merging queue - splitQueue *splitQueue // Range splitting queue - replicateQueue *replicateQueue // Replication queue - replicaGCQueue *replicaGCQueue // Replica GC queue - raftLogQueue *raftLogQueue // Raft log truncation queue + Ident *roachpb.StoreIdent // pointer to catch access before Start() is called + cfg StoreConfig + db *kv.DB + engine storage.Engine // The underlying key-value store + tsCache tscache.Cache // Most recent timestamps for keys / key ranges + allocator Allocator // Makes allocation decisions + replRankings *replicaRankings + storeRebalancer *StoreRebalancer + rangeIDAlloc *idalloc.Allocator // Range ID allocator + mvccGCQueue *mvccGCQueue // MVCC GC queue + mergeQueue *mergeQueue // Range merging queue + splitQueue *splitQueue // Range splitting queue + replicateQueue *replicateQueue // Replication queue + replicaGCQueue *replicaGCQueue // Replica GC queue + raftLogQueue *raftLogQueue // Raft log truncation queue + // Carries out truncations proposed by the raft log queue, and "replicated" + // via raft, when they are safe. + raftTruncator *raftLogTruncator raftSnapshotQueue *raftSnapshotQueue // Raft repair queue tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue scanner *replicaScanner // Replica scanner @@ -1162,6 +1165,7 @@ func NewStore( ) } s.replRankings = newReplicaRankings() + s.raftTruncator = makeRaftLogTruncator((*storeForTruncatorImpl)(s)) s.draining.Store(false) s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency) @@ -3486,6 +3490,40 @@ func (s *Store) getRootMemoryMonitorForKV() *mon.BytesMonitor { return s.cfg.KVMemoryMonitor } +// Implementation of the storeForTruncator interface. +type storeForTruncatorImpl Store + +var _ storeForTruncator = &storeForTruncatorImpl{} + +func (s *storeForTruncatorImpl) acquireReplicaForTruncator( + rangeID roachpb.RangeID, +) replicaForTruncator { + r, err := (*Store)(s).GetReplica(rangeID) + if err != nil || r == nil { + // The only error we can see here is roachpb.NewRangeNotFoundError, so we + // can ignore it. + return nil + } + if isAlive := func() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.mu.destroyStatus.IsAlive() + }(); !isAlive { + return nil + } + r.raftMu.Lock() + return (*raftTruncatorReplica)(r) +} + +func (s *storeForTruncatorImpl) releaseReplicaForTruncator(r replicaForTruncator) { + replica := r.(*raftTruncatorReplica) + replica.raftMu.Unlock() +} + +func (s *storeForTruncatorImpl) getEngine() storage.Engine { + return (*Store)(s).engine +} + // WriteClusterVersion writes the given cluster version to the store-local // cluster version key. We only accept a raw engine to ensure we're persisting // the write durably. diff --git a/pkg/kv/kvserver/testdata/raft_log_truncator b/pkg/kv/kvserver/testdata/raft_log_truncator new file mode 100644 index 000000000000..e61b1666061d --- /dev/null +++ b/pkg/kv/kvserver/testdata/raft_log_truncator @@ -0,0 +1,401 @@ +create-replica id=1 trunc-index=20 last-log-entry=30 +---- + +# The replica hs log entries [21,29] since we have truncated up to 20. +print-engine-state id=1 +---- +truncated index: 20 +log entries: 21, 22, 23, 24, 25, 26, 27, 28, 29 +durable applied index: 0 + +# Try to add a pending truncation that will not advance the truncated index. +# It is not queued. +add-pending-truncation id=1 first-index=10 trunc-index=20 delta-bytes=-20 sideloaded-bytes=10 +---- +r1.getPendingTruncs +r1.getTruncatedState +truncator ranges: + +# Pending is empty. +print-replica-state id=1 +---- +truncIndex: 20 +pending: + +# Add a pending truncation that overlaps with the truncated index, but will +# advance the truncated state. It is queued. +add-pending-truncation id=1 first-index=16 trunc-index=22 delta-bytes=-20 sideloaded-bytes=10 +---- +r1.getPendingTruncs +r1.getTruncatedState +r1.sideloadedBytesIfTruncatedFromTo(21, 23) +truncator ranges: 1 + +print-replica-state id=1 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:16 logDeltaBytes:-30 isDeltaTrusted:true} + +# The durability advanced, but we haven't updated the RaftAppliedIndex, so the +# pending truncation cannot be enacted. +durability-advanced +---- +acquireReplica(1) +r1.getTruncatedState +r1.getPendingTruncs +r1.getStateLoader +releaseReplica(1) +truncator ranges: 1 + +print-replica-state id=1 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:16 logDeltaBytes:-30 isDeltaTrusted:true} + +# Queue another non-existent replica, to be annoying. +add-replica-to-truncator id=13 +---- +truncator ranges: 1, 13 + +# Create replica 2 that is similar to replica 1. +create-replica id=2 trunc-index=20 last-log-entry=35 +---- + +# Add a pending truncation for replica 2 that does not overlap with the +# truncated index. +add-pending-truncation id=2 first-index=21 trunc-index=22 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(21, 23) +truncator ranges: 1, 2, 13 + +print-replica-state id=2 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:21 logDeltaBytes:-30 isDeltaTrusted:true} + +# Update the RaftAppliedIndex of replica 1 to equal the index of the pending +# truncation. +write-raft-applied-index id=1 raft-applied-index=22 +---- + +# Inform the truncator that durability advanced, which will cause replica 1's +# pending truncation to be enacted. +durability-advanced +---- +acquireReplica(1) +r1.getTruncatedState +r1.getPendingTruncs +r1.getStateLoader +r1.setTruncatedStateAndSideEffects(..., expectedFirstIndex:16) => trusted:false +r1.setTruncationDeltaAndTrusted(delta:-30, trusted:false) +releaseReplica(1) +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +releaseReplica(2) +acquireReplica(13) +truncator ranges: 2 + +# Replica 1 is truncated. +print-replica-state id=1 +---- +truncIndex: 22 +pending: + +print-engine-state id=1 +---- +truncated index: 22 +log entries: 23, 24, 25, 26, 27, 28, 29 +durable applied index: 22 + +# Replica 2 is still pending truncation. +print-replica-state id=2 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:21 logDeltaBytes:-30 isDeltaTrusted:true} + +print-engine-state id=2 +---- +truncated index: 20 +log entries: 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 +durable applied index: 0 + +# Update the RaftAppliedIndex of replica 2 to below the index of the pending +# truncation. +write-raft-applied-index id=2 raft-applied-index=21 +---- + +# No truncation. +durability-advanced +---- +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +releaseReplica(2) +truncator ranges: 2 + +print-replica-state id=2 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:21 logDeltaBytes:-30 isDeltaTrusted:true} + +print-engine-state id=2 +---- +truncated index: 20 +log entries: 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 +durable applied index: 21 + +# Update the RaftAppliedIndex of replica 2 to above the index of the pending +# truncation. +write-raft-applied-index id=2 raft-applied-index=24 +---- + +# Truncation happens, but only up to the pending truncation and not the +# RaftAppliedIndex. +durability-advanced +---- +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:true +r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +releaseReplica(2) +truncator ranges: + +print-replica-state id=2 +---- +truncIndex: 22 +pending: + +print-engine-state id=2 +---- +truncated index: 22 +log entries: 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 +durable applied index: 24 + +# Add a pending truncation for replica 2 that overlaps with the truncated +# index. +add-pending-truncation id=2 first-index=21 trunc-index=24 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(23, 25) +truncator ranges: 2 + +print-replica-state id=2 +---- +truncIndex: 22 +pending: + {RaftTruncatedState:{Index:24 Term:0} expectedFirstIndex:21 logDeltaBytes:-30 isDeltaTrusted:true} + +# Enact the truncation. Note the isDeltaTrusted is false. +durability-advanced +---- +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:false +r2.setTruncationDeltaAndTrusted(delta:-30, trusted:false) +releaseReplica(2) +truncator ranges: + +print-replica-state id=2 +---- +truncIndex: 24 +pending: + +print-engine-state id=2 +---- +truncated index: 24 +log entries: 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 +durable applied index: 24 + +# Enqueue multiple truncations. +add-pending-truncation id=2 first-index=25 trunc-index=26 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(25, 27) +truncator ranges: 2 + +add-pending-truncation id=2 first-index=27 trunc-index=28 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(27, 29) +truncator ranges: 2 + +print-replica-state id=2 +---- +truncIndex: 24 +pending: + {RaftTruncatedState:{Index:26 Term:0} expectedFirstIndex:25 logDeltaBytes:-30 isDeltaTrusted:true} + {RaftTruncatedState:{Index:28 Term:0} expectedFirstIndex:27 logDeltaBytes:-30 isDeltaTrusted:true} + +add-pending-truncation id=2 first-index=28 trunc-index=29 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(29, 30) +truncator ranges: 2 + +# The last two pending truncations are merged and since they overlap, +# isDeltaTrusted=false. +print-replica-state id=2 +---- +truncIndex: 24 +pending: + {RaftTruncatedState:{Index:26 Term:0} expectedFirstIndex:25 logDeltaBytes:-30 isDeltaTrusted:true} + {RaftTruncatedState:{Index:29 Term:0} expectedFirstIndex:27 logDeltaBytes:-60 isDeltaTrusted:false} + +# Advance RaftAppliedIndex enough to enact the first but not the second. +write-raft-applied-index id=2 raft-applied-index=27 +---- + +durability-advanced +---- +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:25) => trusted:true +r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +releaseReplica(2) +truncator ranges: 2 + +print-replica-state id=2 +---- +truncIndex: 26 +pending: + {RaftTruncatedState:{Index:29 Term:0} expectedFirstIndex:27 logDeltaBytes:-60 isDeltaTrusted:false} + +print-engine-state id=2 +---- +truncated index: 26 +log entries: 27, 28, 29, 30, 31, 32, 33, 34 +durable applied index: 27 + +# Enqueue another truncation. +add-pending-truncation id=2 first-index=30 trunc-index=31 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(30, 32) +truncator ranges: 2 + +print-replica-state id=2 +---- +truncIndex: 26 +pending: + {RaftTruncatedState:{Index:29 Term:0} expectedFirstIndex:27 logDeltaBytes:-60 isDeltaTrusted:false} + {RaftTruncatedState:{Index:31 Term:0} expectedFirstIndex:30 logDeltaBytes:-30 isDeltaTrusted:true} + +# Advance RaftAppliedIndex enough to enact both. +write-raft-applied-index id=2 raft-applied-index=31 +---- + +# Note that even though the first indices are properly aligned, one of the +# pending truncations was a merge of two pending truncations which overlapped. +# So even though the calls to setTruncatedStateAndSideEffects return true, The +# isDeltaTrusted in one of the calls to setTruncationDeltaAndTrustedLocked is +# false. +durability-advanced +---- +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:27) => trusted:true +r2.setTruncationDeltaAndTrusted(delta:-60, trusted:false) +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:30) => trusted:true +r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +releaseReplica(2) +truncator ranges: + +print-replica-state id=2 +---- +truncIndex: 31 +pending: + +print-engine-state id=2 +---- +truncated index: 31 +log entries: 32, 33, 34 +durable applied index: 31 + +# Enqueue truncations such that when merging the second and third truncation, +# there is an error in computing side-loaded bytes. +add-pending-truncation id=2 first-index=32 trunc-index=32 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(32, 33) +truncator ranges: 2 + +add-pending-truncation id=2 first-index=33 trunc-index=33 delta-bytes=-20 sideloaded-bytes=10 +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(33, 34) +truncator ranges: 2 + +print-replica-state id=2 +---- +truncIndex: 31 +pending: + {RaftTruncatedState:{Index:32 Term:0} expectedFirstIndex:32 logDeltaBytes:-30 isDeltaTrusted:true} + {RaftTruncatedState:{Index:33 Term:0} expectedFirstIndex:33 logDeltaBytes:-30 isDeltaTrusted:true} + +add-pending-truncation id=2 first-index=34 trunc-index=34 delta-bytes=-20 sideloaded-bytes=10 sideloaded-err=true +---- +r2.getPendingTruncs +r2.getTruncatedState +r2.sideloadedBytesIfTruncatedFromTo(34, 35) +truncator ranges: 2 + +# Because of the error, the delta for the merged truncation is not trusted. +print-replica-state id=2 +---- +truncIndex: 31 +pending: + {RaftTruncatedState:{Index:32 Term:0} expectedFirstIndex:32 logDeltaBytes:-30 isDeltaTrusted:true} + {RaftTruncatedState:{Index:34 Term:0} expectedFirstIndex:33 logDeltaBytes:-60 isDeltaTrusted:false} + +# Advance RaftAppliedIndex enough to enact all. +write-raft-applied-index id=2 raft-applied-index=34 +---- + +durability-advanced +---- +acquireReplica(2) +r2.getTruncatedState +r2.getPendingTruncs +r2.getStateLoader +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:32) => trusted:true +r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:33) => trusted:true +r2.setTruncationDeltaAndTrusted(delta:-60, trusted:false) +releaseReplica(2) +truncator ranges: + +print-replica-state id=2 +---- +truncIndex: 34 +pending: + +print-engine-state id=2 +---- +truncated index: 34 +log entries: +durable applied index: 34 diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 23ffd29e018a..a8ca0c9de814 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1228,6 +1228,19 @@ message TruncateLogRequest { // itself. The range may have changed from the one specified in the header // in the case of a merge. int64 range_id = 3 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + + // ExpectedFirstIndex is the expected Index of the last TruncateLogRequest, + // i.e., we expect that this request will typically be truncating entries + // [ExpectedFirstIndex, Index). + // + // There is no correctness issue if the replica applying this truncation has + // not seen the preceding TruncateLogRequest or has seen one with an Index + // that is not equal to ExpectedFirstIndex. This is an optimization that + // typically allows the potentially expensive computation of the bytes being + // discarded from the raft log to be performed once, at the leaseholder. + // + // Populated starting at cluster version LooselyCoupledRaftLogTruncation. + uint64 expected_first_index = 4; } // TruncateLogResponse is the response to a TruncateLog() operation.