From 22f3f040afd0715c979a57ce6e3e6be00846b43c Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 27 Jan 2022 17:51:03 -0500 Subject: [PATCH] kvserver: introduce RangeAppliedState.RaftAppliedIndexTerm The same field is also introduced in ReplicaState, since ReplicaState is used in internal data-structures and when sending a state machine snapshot. The migration code uses a special unused term value in a ReplicatedEvalResult to signal to the state machine application machinery to start populating the term field. Fixes #75671 Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 14 + .../kvserver/batcheval/cmd_end_transaction.go | 16 +- pkg/kv/kvserver/batcheval/cmd_migrate.go | 22 ++ pkg/kv/kvserver/batcheval/result/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/result/result.go | 21 +- pkg/kv/kvserver/below_raft_protos_test.go | 2 +- pkg/kv/kvserver/kvserverpb/state.proto | 4 + pkg/kv/kvserver/replica_application_result.go | 2 + .../replica_application_state_machine.go | 24 +- pkg/kv/kvserver/replica_raftstorage.go | 52 +++- pkg/kv/kvserver/stateloader/BUILD.bazel | 1 + pkg/kv/kvserver/stateloader/initial.go | 19 +- pkg/kv/kvserver/stateloader/stateloader.go | 20 +- pkg/kv/kvserver/store_snapshot.go | 14 + pkg/migration/migrations/BUILD.bazel | 3 + pkg/migration/migrations/migrations.go | 10 + .../migrations/raft_applied_index_term.go | 100 +++++++ .../raft_applied_index_term_external_test.go | 249 ++++++++++++++++++ pkg/storage/enginepb/mvcc3.proto | 9 + 21 files changed, 564 insertions(+), 23 deletions(-) create mode 100644 pkg/migration/migrations/raft_applied_index_term.go create mode 100644 pkg/migration/migrations/raft_applied_index_term_external_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 409b4d010477..985356d20568 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-50 set the active cluster version in the format '.' +version version 21.2-54 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index f63361501995..e8b71cb5db45 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -186,6 +186,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-50set the active cluster version in the format '.' +versionversion21.2-54set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 1cc066ae2a2f..bcc87f399fc3 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -251,6 +251,12 @@ const ( // EnableProtectedTimestampsForTenant enables the use of protected timestamps // in secondary tenants. EnableProtectedTimestampsForTenant + // AddRaftAppliedIndexTermMigration is a migration that causes each range + // replica to start populating RangeAppliedState.RaftAppliedIndexTerm field. + AddRaftAppliedIndexTermMigration + // PostAddRaftAppliedIndexTermMigration is used for asserting that + // RaftAppliedIndexTerm is populated. + PostAddRaftAppliedIndexTermMigration // ************************************************* // Step (1): Add new versions here. @@ -387,6 +393,14 @@ var versionsSingleton = keyedVersions{ Key: EnableProtectedTimestampsForTenant, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50}, }, + { + Key: AddRaftAppliedIndexTermMigration, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52}, + }, + { + Key: PostAddRaftAppliedIndexTermMigration, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 83c8698f0f60..f287f81d5867 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -1011,9 +1011,23 @@ func splitTriggerHelper( if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version") } + // The RHS should populate RaftAppliedIndexTerm if the LHS is doing so. + // Alternatively, we could be more aggressive and also look at the cluster + // version, but this is simpler -- if the keyspace occupied by the + // original unsplit range has not been migrated yet, by an ongoing + // migration, both LHS and RHS will be migrated later. + rangeAppliedState, err := sl.LoadRangeAppliedState(ctx, batch) + if err != nil { + return enginepb.MVCCStats{}, result.Result{}, + errors.Wrap(err, "unable to load range applied state") + } + writeRaftAppliedIndexTerm := false + if rangeAppliedState.RaftAppliedIndexTerm > 0 { + writeRaftAppliedIndexTerm = true + } *h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState( ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease, - *gcThreshold, replicaVersion, + *gcThreshold, replicaVersion, writeRaftAppliedIndexTerm, ) if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state") diff --git a/pkg/kv/kvserver/batcheval/cmd_migrate.go b/pkg/kv/kvserver/batcheval/cmd_migrate.go index 12de72f787f5..8bf16b05b1e4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_migrate.go +++ b/pkg/kv/kvserver/batcheval/cmd_migrate.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/errors" @@ -49,6 +50,8 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re func init() { _ = registerMigration // prevent unused warning. + registerMigration( + clusterversion.AddRaftAppliedIndexTermMigration, addRaftAppliedIndexTermMigration) } func registerMigration(key clusterversion.Key, migration migration) { @@ -89,6 +92,25 @@ func Migrate( return pd, nil } +// addRaftAppliedIndexTermMigration migrates the system to start populating +// the RangeAppliedState.RaftAppliedIndexTerm field. +func addRaftAppliedIndexTermMigration( + ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, +) (result.Result, error) { + return result.Result{ + Replicated: kvserverpb.ReplicatedEvalResult{ + State: &kvserverpb.ReplicaState{ + // Signal the migration by sending a term on the new field that we + // want to migrate into. This term is chosen as one that would never + // be used in practice (since raftInitialLogTerm is 10), so we can + // special-case it below raft and start writing the (real) term to the + // AppliedState. + RaftAppliedIndexTerm: stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration, + }, + }, + }, nil +} + // TestingRegisterMigrationInterceptor is used in tests to register an // interceptor for a below-raft migration. // diff --git a/pkg/kv/kvserver/batcheval/result/BUILD.bazel b/pkg/kv/kvserver/batcheval/result/BUILD.bazel index 220a291e0c54..15cd1a9259f7 100644 --- a/pkg/kv/kvserver/batcheval/result/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/result/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/stateloader", "//pkg/roachpb:with-mocks", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 23a632f996cc..c08f8e1fd1e9 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -193,14 +194,30 @@ func coalesceBool(lhs *bool, rhs *bool) { func (p *Result) MergeAndDestroy(q Result) error { if q.Replicated.State != nil { if q.Replicated.State.RaftAppliedIndex != 0 { - return errors.AssertionFailedf("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify RaftAppliedIndex") } if q.Replicated.State.LeaseAppliedIndex != 0 { - return errors.AssertionFailedf("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify LeaseAppliedIndex") } if p.Replicated.State == nil { p.Replicated.State = &kvserverpb.ReplicaState{} } + if q.Replicated.State.RaftAppliedIndexTerm != 0 { + if q.Replicated.State.RaftAppliedIndexTerm == + stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration { + if p.Replicated.State.RaftAppliedIndexTerm != 0 && + p.Replicated.State.RaftAppliedIndexTerm != + stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration { + return errors.AssertionFailedf("invalid term value %d", + p.Replicated.State.RaftAppliedIndexTerm) + } + p.Replicated.State.RaftAppliedIndexTerm = q.Replicated.State.RaftAppliedIndexTerm + q.Replicated.State.RaftAppliedIndexTerm = 0 + } else { + return errors.AssertionFailedf("invalid term value %d", + q.Replicated.State.RaftAppliedIndexTerm) + } + } if p.Replicated.State.Desc == nil { p.Replicated.State.Desc = q.Replicated.State.Desc } else if q.Replicated.State.Desc != nil { diff --git a/pkg/kv/kvserver/below_raft_protos_test.go b/pkg/kv/kvserver/below_raft_protos_test.go index ab2f072d9c26..5be8e1f95b0b 100644 --- a/pkg/kv/kvserver/below_raft_protos_test.go +++ b/pkg/kv/kvserver/below_raft_protos_test.go @@ -78,7 +78,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ return enginepb.NewPopulatedRangeAppliedState(r, false) }, emptySum: 615555020845646359, - populatedSum: 17354515720541950025, + populatedSum: 12125419916111069931, }, reflect.TypeOf(&raftpb.HardState{}): { populatedConstructor: func(r *rand.Rand) protoutil.Message { diff --git a/pkg/kv/kvserver/kvserverpb/state.proto b/pkg/kv/kvserver/kvserverpb/state.proto index 08034e04dfb3..ac6cdb577f70 100644 --- a/pkg/kv/kvserver/kvserverpb/state.proto +++ b/pkg/kv/kvserver/kvserverpb/state.proto @@ -134,6 +134,10 @@ message ReplicaState { // "follower reads" at or below this timestamp. util.hlc.Timestamp raft_closed_timestamp = 13 [(gogoproto.nullable) = false]; + // The term corresponding to RaftAppliedIndex. This is derived from + // RangeAppliedState.RaftAppliedIndexTerm. + uint64 raft_applied_index_term = 14; + reserved 8, 9, 10; } diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 68efb0dc756a..ee6496d99f41 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -90,6 +90,8 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult) // replica state for this batch. if haveState := r.State != nil; haveState { r.State.Stats = nil + // Reset the signal used to execute the AddRaftAppliedIndexTermMigration. + r.State.RaftAppliedIndexTerm = 0 if *r.State == (kvserverpb.ReplicaState{}) { r.State = nil } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index bc3b77e19f6c..16e6e8a104a2 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -849,9 +850,23 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( ctx context.Context, cmd *replicatedCmd, ) { - if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 { - b.state.RaftAppliedIndex = raftAppliedIndex + raftAppliedIndex := cmd.ent.Index + if raftAppliedIndex == 0 { + log.Fatalf(ctx, "raft entry with index 0") + } + b.state.RaftAppliedIndex = raftAppliedIndex + rs := cmd.decodedRaftEntry.replicatedResult().State + // We are post migration or this replicatedCmd is doing the migration. + if b.state.RaftAppliedIndexTerm > 0 || (rs != nil && + rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) { + // Once we populate b.state.RaftAppliedIndexTerm it will flow into the + // persisted RangeAppliedState and into the in-memory representation in + // Replica.mu.state. The latter is used to initialize b.state, so future + // calls to this method will see that the migration has already happened + // and will continue to populate the term. + b.state.RaftAppliedIndexTerm = cmd.ent.Term } + if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 { b.state.LeaseAppliedIndex = leaseAppliedIndex } @@ -915,6 +930,9 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // Update the replica's applied indexes, mvcc stats and closed timestamp. r.mu.Lock() r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex + // RaftAppliedIndexTerm will be non-zero only when the + // AddRaftAppliedIndexTermMigration has happened. + r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex // Sanity check that the RaftClosedTimestamp doesn't go backwards. @@ -978,7 +996,7 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { // lease index along with the mvcc stats, all in one key. loader := &b.r.raftMu.stateLoader return loader.SetRangeAppliedState( - ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, + ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.RaftAppliedIndexTerm, b.state.Stats, &b.state.RaftClosedTimestamp, ) } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..7eac6fcbebc7 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -386,6 +386,15 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 { // r.mu is held. Note that the returned snapshot is a placeholder and // does not contain any of the replica data. The snapshot is actually generated // (and sent) by the Raft snapshot queue. +// +// More specifically, this method is called by etcd/raft in +// (*raftLog).snapshot. Raft expects that it generates the snapshot (by +// calling Snapshot) and that "sending" the result actually sends the +// snapshot. In CockroachDB, that message is intercepted (at the sender) and +// instead we add the replica (the raft leader) to the raft snapshot queue, +// and when its turn comes we look at the raft state for followers that want a +// snapshot, and then send one. That actual sending path does not call this +// Snapshot method. func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) { r.mu.AssertHeld() appliedIndex := r.mu.state.RaftAppliedIndex @@ -418,11 +427,27 @@ func (r *Replica) GetSnapshot( // the corresponding Raft command not applied yet). r.raftMu.Lock() snap := r.store.engine.NewSnapshot() - r.mu.Lock() - appliedIndex := r.mu.state.RaftAppliedIndex - // Cleared when OutgoingSnapshot closes. - r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) - r.mu.Unlock() + { + r.mu.Lock() + // We will fetch the applied index later again, from snap. The + // appliedIndex fetched here is narrowly used for adding a log truncation + // constraint to prevent log entries > appliedIndex from being removed. + // Note that the appliedIndex maintained in Replica actually lags the one + // in the engine, since replicaAppBatch.ApplyToStateMachine commits the + // engine batch and then acquires Replica.mu to update + // Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value + // here is harmless since using a lower index in this constraint, than the + // actual snapshot index, preserves more from a log truncation + // perspective. + // + // TODO(sumeer): despite the above justification, this is unnecessarily + // complicated. Consider loading the RaftAppliedIndex from the snap for + // this use case. + appliedIndex := r.mu.state.RaftAppliedIndex + // Cleared when OutgoingSnapshot closes. + r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) + r.mu.Unlock() + } r.raftMu.Unlock() release := func() { @@ -568,6 +593,12 @@ func snapshot( } term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex) + // If we've migrated to populating RaftAppliedIndexTerm, check that the term + // from the two sources are equal. + if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm { + return OutgoingSnapshot{}, + errors.AssertionFailedf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm) + } if err != nil { return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex) } @@ -920,6 +951,12 @@ func (r *Replica) applySnapshot( log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", state.RaftAppliedIndex, nonemptySnap.Metadata.Index) } + // If we've migrated to populating RaftAppliedIndexTerm, check that the term + // from the two sources are equal. + if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term { + log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d", + state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term) + } // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be @@ -975,6 +1012,11 @@ func (r *Replica) applySnapshot( // feelings about this ever change, we can add a LastIndex field to // raftpb.SnapshotMetadata. r.mu.lastIndex = state.RaftAppliedIndex + + // TODO(sumeer): We should be able to set this to + // nonemptySnap.Metadata.Term. See + // https://github.com/cockroachdb/cockroach/pull/75675#pullrequestreview-867926687 + // for a discussion regarding this. r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = 0 // Update the store stats for the data in the snapshot. diff --git a/pkg/kv/kvserver/stateloader/BUILD.bazel b/pkg/kv/kvserver/stateloader/BUILD.bazel index 58f31fb1154a..d4f6b2da853b 100644 --- a/pkg/kv/kvserver/stateloader/BUILD.bazel +++ b/pkg/kv/kvserver/stateloader/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/keys", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb:with-mocks", diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..9d4b58d74691 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -13,6 +13,7 @@ package stateloader import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -29,6 +30,15 @@ import ( const ( raftInitialLogIndex = 10 raftInitialLogTerm = 5 + + // RaftLogTermSignalForAddRaftAppliedIndexTermMigration is never persisted + // in the state machine or in HardState. It is only used in + // AddRaftAppliedIndexTermMigration to signal to the below raft code that + // the migration should happen when applying the raft log entry that + // contains ReplicatedEvalResult.State.RaftAppliedIndexTerm equal to this + // value. It is less than raftInitialLogTerm since that ensures it will + // never be used under normal operation. + RaftLogTermSignalForAddRaftAppliedIndexTermMigration = 3 ) // WriteInitialReplicaState sets up a new Range, but without writing an @@ -46,6 +56,7 @@ func WriteInitialReplicaState( lease roachpb.Lease, gcThreshold hlc.Timestamp, replicaVersion roachpb.Version, + writeRaftAppliedIndexTerm bool, ) (enginepb.MVCCStats, error) { rsl := Make(desc.RangeID) var s kvserverpb.ReplicaState @@ -54,6 +65,9 @@ func WriteInitialReplicaState( Index: raftInitialLogIndex, } s.RaftAppliedIndex = s.TruncatedState.Index + if writeRaftAppliedIndexTerm { + s.RaftAppliedIndexTerm = s.TruncatedState.Term + } s.Desc = &roachpb.RangeDescriptor{ RangeID: desc.RangeID, } @@ -102,9 +116,12 @@ func WriteInitialRangeState( initialGCThreshold := hlc.Timestamp{} initialMS := enginepb.MVCCStats{} + writeRaftAppliedIndexTerm := + clusterversion.ClusterVersion{Version: replicaVersion}.IsActiveVersion( + clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration)) if _, err := WriteInitialReplicaState( ctx, readWriter, initialMS, desc, initialLease, initialGCThreshold, - replicaVersion, + replicaVersion, writeRaftAppliedIndexTerm, ); err != nil { return err } diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..088eab7c364b 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -78,6 +78,7 @@ func (rsl StateLoader) Load( return kvserverpb.ReplicaState{}, err } s.RaftAppliedIndex = as.RaftAppliedIndex + s.RaftAppliedIndexTerm = as.RaftAppliedIndexTerm s.LeaseAppliedIndex = as.LeaseAppliedIndex ms := as.RangeStats.ToStats() s.Stats = &ms @@ -133,8 +134,9 @@ func (rsl StateLoader) Save( return enginepb.MVCCStats{}, err } } - rai, lai, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, &state.RaftClosedTimestamp - if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, ms, ct); err != nil { + rai, lai, rait, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, state.RaftAppliedIndexTerm, + &state.RaftClosedTimestamp + if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, rait, ms, ct); err != nil { return enginepb.MVCCStats{}, err } return *ms, nil @@ -194,14 +196,15 @@ func (rsl StateLoader) LoadMVCCStats( func (rsl StateLoader) SetRangeAppliedState( ctx context.Context, readWriter storage.ReadWriter, - appliedIndex, leaseAppliedIndex uint64, + appliedIndex, leaseAppliedIndex, appliedIndexTerm uint64, newMS *enginepb.MVCCStats, raftClosedTimestamp *hlc.Timestamp, ) error { as := enginepb.RangeAppliedState{ - RaftAppliedIndex: appliedIndex, - LeaseAppliedIndex: leaseAppliedIndex, - RangeStats: newMS.ToPersistentStats(), + RaftAppliedIndex: appliedIndex, + LeaseAppliedIndex: leaseAppliedIndex, + RangeStats: newMS.ToPersistentStats(), + RaftAppliedIndexTerm: appliedIndexTerm, } if raftClosedTimestamp != nil && !raftClosedTimestamp.IsEmpty() { as.RaftClosedTimestamp = raftClosedTimestamp @@ -223,7 +226,8 @@ func (rsl StateLoader) SetMVCCStats( return err } return rsl.SetRangeAppliedState( - ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, newMS, as.RaftClosedTimestamp) + ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, newMS, + as.RaftClosedTimestamp) } // SetClosedTimestamp overwrites the closed timestamp. @@ -235,7 +239,7 @@ func (rsl StateLoader) SetClosedTimestamp( return err } return rsl.SetRangeAppliedState( - ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, + ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, as.RangeStats.ToStatsPtr(), closedTS) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e5ed6dd91677..51d2f23b6142 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -15,6 +15,7 @@ import ( "io" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" @@ -903,6 +904,18 @@ func SendEmptySnapshot( return err } + // SendEmptySnapshot is only used by the cockroach debug reset-quorum tool. + // It is experimental and unlikely to be used in cluster versions that are + // older than AddRaftAppliedIndexTermMigration. We do not want the cluster + // version to fully dictate the value of the writeAppliedIndexTerm + // parameter, since if this node's view of the version is stale we could + // regress to a state before the migration. Instead, we return an error if + // the cluster version is old. + writeAppliedIndexTerm := st.Version.IsActive(ctx, clusterversion.AddRaftAppliedIndexTermMigration) + if !writeAppliedIndexTerm { + return errors.Errorf("cluster version is too old %s", + st.Version.ActiveVersionOrEmpty(ctx)) + } ms, err = stateloader.WriteInitialReplicaState( ctx, eng, @@ -911,6 +924,7 @@ func SendEmptySnapshot( roachpb.Lease{}, hlc.Timestamp{}, // gcThreshold st.Version.ActiveVersionOrEmpty(ctx).Version, + writeAppliedIndexTerm, ) if err != nil { return err diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index 515502093562..a059a10bf61d 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "migrate_span_configs.go", "migrations.go", "public_schema_migration.go", + "raft_applied_index_term.go", "schema_changes.go", "seed_tenant_span_configs.go", ], @@ -66,6 +67,7 @@ go_test( "main_test.go", "migrate_span_configs_test.go", "public_schema_migration_external_test.go", + "raft_applied_index_term_external_test.go", ], data = glob(["testdata/**"]), embed = [":migrations"], @@ -77,6 +79,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/stateloader", "//pkg/roachpb:with-mocks", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index 832180f7dd9e..7459fb420325 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -97,6 +97,16 @@ var migrations = []migration.Migration{ NoPrecondition, grantOptionMigration, ), + migration.NewSystemMigration( + "populate RangeAppliedState.RaftAppliedIndexTerm for all ranges", + toCV(clusterversion.AddRaftAppliedIndexTermMigration), + raftAppliedIndexTermMigration, + ), + migration.NewSystemMigration( + "purge all replicas not populating RangeAppliedState.RaftAppliedIndexTerm", + toCV(clusterversion.PostAddRaftAppliedIndexTermMigration), + postRaftAppliedIndexTermMigration, + ), } func init() { diff --git a/pkg/migration/migrations/raft_applied_index_term.go b/pkg/migration/migrations/raft_applied_index_term.go new file mode 100644 index 000000000000..7b6ea09134b1 --- /dev/null +++ b/pkg/migration/migrations/raft_applied_index_term.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "bytes" + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// defaultPageSize controls how many ranges are paged in by default when +// iterating through all ranges in a cluster during any given migration. We +// pulled this number out of thin air(-ish). Let's consider a cluster with 50k +// ranges, with each range taking ~200ms. We're being somewhat conservative with +// the duration, but in a wide-area cluster with large hops between the manager +// and the replicas, it could be true. Here's how long it'll take for various +// block sizes: +// +// page size of 1 ~ 2h 46m +// page size of 50 ~ 3m 20s +// page size of 200 ~ 50s +const defaultPageSize = 200 + +func raftAppliedIndexTermMigration( + ctx context.Context, cv clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + var batchIdx, numMigratedRanges int + init := func() { batchIdx, numMigratedRanges = 1, 0 } + if err := deps.Cluster.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // NB: This is a bit of a wart. We want to reach the first range, + // but we can't address the (local) StartKey. However, keys.LocalMax + // is on r1, so we'll just use that instead to target r1. + start, end := desc.StartKey, desc.EndKey + if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 { + start, _ = keys.Addr(keys.LocalMax) + } + if err := deps.DB.Migrate(ctx, start, end, cv.Version); err != nil { + return err + } + } + + // TODO(irfansharif): Instead of logging this to the debug log, we + // should insert these into a `system.migrations` table for external + // observability. + numMigratedRanges += len(descriptors) + log.Infof(ctx, "[batch %d/??] migrated %d ranges", batchIdx, numMigratedRanges) + batchIdx++ + + return nil + }); err != nil { + return err + } + + log.Infof(ctx, "[batch %d/%d] migrated %d ranges", batchIdx, batchIdx, numMigratedRanges) + + // Make sure that all stores have synced. Given we're a below-raft + // migrations, this ensures that the applied state is flushed to disk. + req := &serverpb.SyncAllEnginesRequest{} + op := "flush-stores" + return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.SyncAllEngines(ctx, req) + return err + }) +} + +func postRaftAppliedIndexTermMigration( + ctx context.Context, cv clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + // TODO(sumeer): this is copied from postTruncatedStateMigration. In + // comparison, postSeparatedIntentsMigration iterated over ranges and issues + // a noop below-raft migration. I am not clear on why there is a difference. + // Get this clarified. + + // Purge all replicas that haven't been migrated to use the unreplicated + // truncated state and the range applied state. + truncStateVersion := clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration) + req := &serverpb.PurgeOutdatedReplicasRequest{Version: &truncStateVersion} + op := fmt.Sprintf("purge-outdated-replicas=%s", req.Version) + return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.PurgeOutdatedReplicas(ctx, req) + return err + }) +} diff --git a/pkg/migration/migrations/raft_applied_index_term_external_test.go b/pkg/migration/migrations/raft_applied_index_term_external_test.go new file mode 100644 index 000000000000..cc58d961f713 --- /dev/null +++ b/pkg/migration/migrations/raft_applied_index_term_external_test.go @@ -0,0 +1,249 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func forAllReplicas(tc *testcluster.TestCluster, f func(*kvserver.Replica) error) error { + for i := 0; i < tc.NumServers(); i++ { + err := tc.Server(i).GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { + var err error + s.VisitReplicas(func(repl *kvserver.Replica) bool { + err = f(repl) + return err == nil + }) + return err + }) + if err != nil { + return err + } + } + return nil +} + +func getReplicaCounts( + ctx context.Context, t *testing.T, tc *testcluster.TestCluster, +) (totalReplicas int, replicasWithTerm int) { + t.Helper() + require.NoError(t, forAllReplicas(tc, func(repl *kvserver.Replica) error { + totalReplicas++ + sl := stateloader.Make(repl.RangeID) + appliedState, err := sl.LoadRangeAppliedState(ctx, repl.Engine()) + if err != nil { + return err + } + // Basic sanity checking of the replica. + if appliedState.RaftAppliedIndex == 0 { + return errors.Errorf("expected initialized replica") + } + appliedIndexTermInReplicaStruct := repl.State(ctx).RaftAppliedIndexTerm + // The in-memory ReplicaState maintained in Replica must be consistent + // with that we are placing in RangeAppliedState. + require.Equal(t, appliedState.RaftAppliedIndexTerm, appliedIndexTermInReplicaStruct) + truncState, err := sl.LoadRaftTruncatedState(ctx, repl.Engine()) + if err != nil { + return err + } + if truncState.Index > appliedState.RaftAppliedIndex { + return errors.Errorf("truncated state index %d > applied index %d", + truncState.Index, appliedState.RaftAppliedIndex) + } + // We could simply call repl.GetTerm at this point, but we do some more + // sanity checking. + var appliedIndexTerm uint64 + if truncState.Index == appliedState.RaftAppliedIndex { + appliedIndexTerm = truncState.Term + } else { + lastIndex, err := sl.LoadLastIndex(ctx, repl.Engine()) + if err != nil { + return err + } + if lastIndex < appliedState.RaftAppliedIndex { + return errors.Errorf("last index in raft log %d < applied index %d", + lastIndex, appliedState.RaftAppliedIndex) + } + if appliedIndexTerm, err = repl.GetTerm(appliedState.RaftAppliedIndex); err != nil { + return err + } + } + // Now we can decide whether term is being populated or not, or being + // incorrectly populated. + if appliedState.RaftAppliedIndexTerm == 0 { + // Not populated + } else if appliedState.RaftAppliedIndexTerm == appliedIndexTerm { + replicasWithTerm++ + } else { + return errors.Errorf("expected term %d but found %d", appliedIndexTerm, + appliedState.RaftAppliedIndexTerm) + } + return nil + })) + return totalReplicas, replicasWithTerm +} + +func TestRaftAppliedIndexTermMigration(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + binaryVersion := clusterversion.ByKey(clusterversion.PostAddRaftAppliedIndexTermMigration) + bootstrapVersion := clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration - 1) + makeArgs := func() (args base.TestServerArgs) { + args.Settings = cluster.MakeTestingClusterSettingsWithVersions( + binaryVersion, bootstrapVersion, false, + ) + args.Knobs.Server = &server.TestingKnobs{ + // Start at the version immediately preceding the migration. + BinaryVersionOverride: bootstrapVersion, + // We want to exercise manual control over the upgrade process. + DisableAutomaticVersionUpgrade: 1, + } + return args + } + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: makeArgs(), + 1: makeArgs(), + 2: makeArgs(), + }, + }) + defer tc.Stopper().Stop(ctx) + + totalReplicas, replicasWithTerm := getReplicaCounts(ctx, t, tc) + require.Equal(t, 0, replicasWithTerm) + require.Less(t, 0, totalReplicas) + t.Logf("totalReplicas: %d, replicasWithTerm: %d", totalReplicas, replicasWithTerm) + var scratchKey roachpb.Key + { + // Do a split. + scratchKey = tc.ScratchRange(t) + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err) + span := desc.KeySpan() + t.Logf("splitting range: %s", span.String()) + splitKey := append(span.Key, '0', '0', '0') + leftDesc, rightDesc, err := tc.SplitRange(roachpb.Key(splitKey)) + require.NoError(t, err) + t.Logf("split range into %s and %s", leftDesc.KeySpan(), rightDesc.KeySpan()) + total, withTerm := getReplicaCounts(ctx, t, tc) + require.Less(t, totalReplicas, total) + require.Equal(t, 0, withTerm) + totalReplicas, replicasWithTerm = total, withTerm + t.Logf("after split: totalReplicas: %d, replicasWithTerm: %d", + totalReplicas, replicasWithTerm) + } + // Do the migration. + _, err := tc.Conns[0].ExecContext( + ctx, `SET CLUSTER SETTING version = $1`, binaryVersion.String(), + ) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + return forAllReplicas(tc, func(repl *kvserver.Replica) error { + // TODO(sumeer): when using PostAddRaftAppliedIndexTermMigration, which + // is 21.2-54, this never completes even though all 3 nodes have log + // entries saying: + // [n1,bump-cluster-version] 3480 active cluster version setting is now 21.2-54 (up from 21.2-53(fence)) + if repl.Version().Less(clusterversion.ByKey( + clusterversion.AddRaftAppliedIndexTermMigration)) { + return errors.Newf("unexpected version %s", repl.Version()) + } + return nil + }) + }) + totalReplicas, replicasWithTerm = getReplicaCounts(ctx, t, tc) + require.Equal(t, totalReplicas, replicasWithTerm) + require.Less(t, 0, totalReplicas) + t.Logf("totalReplicas: %d, replicasWithTerm: %d", totalReplicas, replicasWithTerm) + // Do another split + { + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err) + span := desc.KeySpan() + t.Logf("splitting range: %s", span.String()) + splitKey := append(span.Key, '0', '0') + leftDesc, rightDesc, err := tc.SplitRange(roachpb.Key(splitKey)) + require.NoError(t, err) + t.Logf("split range into %s and %s", leftDesc.KeySpan(), rightDesc.KeySpan()) + total, withTerm := getReplicaCounts(ctx, t, tc) + require.Less(t, totalReplicas, total) + require.Equal(t, total, withTerm) + totalReplicas, replicasWithTerm = total, withTerm + t.Logf("after second split: totalReplicas: %d, replicasWithTerm: %d", + totalReplicas, replicasWithTerm) + } + // Check that the term is maintained when a write happens. + kvDB := tc.Server(0).DB() + rScratchKey, err := keys.Addr(scratchKey) + require.NoError(t, err) + repl := tc.GetRaftLeader(t, rScratchKey) + rs1 := repl.State(ctx).ReplicaState + require.NoError(t, kvDB.Put(ctx, scratchKey, 10)) + testutils.SucceedsSoon(t, func() error { + rs2 := repl.State(ctx).ReplicaState + if rs1.RaftAppliedIndex == rs2.RaftAppliedIndex { + return errors.Errorf("waiting for application") + } + return nil + }) + rs2 := repl.State(ctx).ReplicaState + require.Less(t, rs1.RaftAppliedIndex, rs2.RaftAppliedIndex) + require.Equal(t, rs1.RaftAppliedIndexTerm, rs2.RaftAppliedIndexTerm) +} + +func TestLatestClusterDoesNotNeedRaftAppliedIndexTermMigration(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + binaryVersion := clusterversion.ByKey(clusterversion.PostAddRaftAppliedIndexTermMigration) + makeArgs := func() (args base.TestServerArgs) { + args.Settings = cluster.MakeTestingClusterSettingsWithVersions( + binaryVersion, binaryVersion, false, + ) + args.Knobs.Server = &server.TestingKnobs{ + BinaryVersionOverride: binaryVersion, + // We want to exercise manual control over the upgrade process. + DisableAutomaticVersionUpgrade: 1, + } + return args + } + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: makeArgs(), + 1: makeArgs(), + 2: makeArgs(), + }, + }) + defer tc.Stopper().Stop(ctx) + + totalReplicas, replicasWithTerm := getReplicaCounts(ctx, t, tc) + require.Equal(t, totalReplicas, replicasWithTerm) + require.Less(t, 0, totalReplicas) + t.Logf("totalReplicas: %d, replicasWithTerm: %d", totalReplicas, replicasWithTerm) +} diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index 8cddd3dff4a6..a528febf764f 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -224,6 +224,15 @@ message RangeAppliedState { // migration). In 21.1 we cannot write empty timestamp to disk because that // looks like an inconsistency to the consistency-checker. util.hlc.Timestamp raft_closed_timestamp = 4; + + // raft_applied_index_term is the term corresponding to raft_applied_index. + // The serialized proto will not contain this field until code starts + // setting it to a value > 0. This is desirable since we don't want a mixed + // version cluster to have divergent replica state simply because we have + // introduced this field. An explicit migration, + // AddRaftAppliedIndexTermMigration, will cause this field to start being + // populated. + uint64 raft_applied_index_term = 5; } // MVCCWriteValueOp corresponds to a value being written outside of a