diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index bd9b53e9c3d9..c883aa6f14c7 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-50 set the active cluster version in the format '.' +version version 21.2-54 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 822c8cbd171f..bc25fd2bf193 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -186,6 +186,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-50set the active cluster version in the format '.' +versionversion21.2-54set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 1cc066ae2a2f..bcc87f399fc3 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -251,6 +251,12 @@ const ( // EnableProtectedTimestampsForTenant enables the use of protected timestamps // in secondary tenants. EnableProtectedTimestampsForTenant + // AddRaftAppliedIndexTermMigration is a migration that causes each range + // replica to start populating RangeAppliedState.RaftAppliedIndexTerm field. + AddRaftAppliedIndexTermMigration + // PostAddRaftAppliedIndexTermMigration is used for asserting that + // RaftAppliedIndexTerm is populated. + PostAddRaftAppliedIndexTermMigration // ************************************************* // Step (1): Add new versions here. @@ -387,6 +393,14 @@ var versionsSingleton = keyedVersions{ Key: EnableProtectedTimestampsForTenant, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50}, }, + { + Key: AddRaftAppliedIndexTermMigration, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52}, + }, + { + Key: PostAddRaftAppliedIndexTermMigration, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 83c8698f0f60..64d177b0744d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -1011,9 +1011,19 @@ func splitTriggerHelper( if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version") } + // The RHS should populate RaftAppliedIndexTerm if the LHS is doing so. + rangeAppliedState, err := sl.LoadRangeAppliedState(ctx, batch) + if err != nil { + return enginepb.MVCCStats{}, result.Result{}, + errors.Wrap(err, "unable to load range applied state") + } + writeRaftAppliedIndexTerm := false + if rangeAppliedState.RaftAppliedIndexTerm > 0 { + writeRaftAppliedIndexTerm = true + } *h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState( ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease, - *gcThreshold, replicaVersion, + *gcThreshold, replicaVersion, writeRaftAppliedIndexTerm, ) if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state") diff --git a/pkg/kv/kvserver/batcheval/cmd_migrate.go b/pkg/kv/kvserver/batcheval/cmd_migrate.go index 12de72f787f5..ba0710b0c84b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_migrate.go +++ b/pkg/kv/kvserver/batcheval/cmd_migrate.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/errors" @@ -49,6 +50,8 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re func init() { _ = registerMigration // prevent unused warning. + registerMigration( + clusterversion.AddRaftAppliedIndexTermMigration, addRaftAppliedIndexTermMigration) } func registerMigration(key clusterversion.Key, migration migration) { @@ -89,6 +92,23 @@ func Migrate( return pd, nil } +// addRaftAppliedIndexTermMigration migrates the system to start populating +// the RangeAppliedState.RaftAppliedIndexTerm field. +func addRaftAppliedIndexTermMigration( + ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, +) (result.Result, error) { + return result.Result{ + Replicated: kvserverpb.ReplicatedEvalResult{ + State: &kvserverpb.ReplicaState{ + // NB: This is not a real term value. We can not know the term at + // proposal time since we don't know when this will be added as a raft + // log entry. + RaftAppliedIndexTerm: stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration, + }, + }, + }, nil +} + // TestingRegisterMigrationInterceptor is used in tests to register an // interceptor for a below-raft migration. // diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 23a632f996cc..c82e31ca7d9c 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -193,10 +193,13 @@ func coalesceBool(lhs *bool, rhs *bool) { func (p *Result) MergeAndDestroy(q Result) error { if q.Replicated.State != nil { if q.Replicated.State.RaftAppliedIndex != 0 { - return errors.AssertionFailedf("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify RaftAppliedIndex") + } + if q.Replicated.State.RaftAppliedIndexTerm != 0 { + return errors.AssertionFailedf("must not specify RaftAppliedIndexTerm") } if q.Replicated.State.LeaseAppliedIndex != 0 { - return errors.AssertionFailedf("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify LeaseAppliedIndex") } if p.Replicated.State == nil { p.Replicated.State = &kvserverpb.ReplicaState{} diff --git a/pkg/kv/kvserver/kvserverpb/state.proto b/pkg/kv/kvserver/kvserverpb/state.proto index 08034e04dfb3..ac6cdb577f70 100644 --- a/pkg/kv/kvserver/kvserverpb/state.proto +++ b/pkg/kv/kvserver/kvserverpb/state.proto @@ -134,6 +134,10 @@ message ReplicaState { // "follower reads" at or below this timestamp. util.hlc.Timestamp raft_closed_timestamp = 13 [(gogoproto.nullable) = false]; + // The term corresponding to RaftAppliedIndex. This is derived from + // RangeAppliedState.RaftAppliedIndexTerm. + uint64 raft_applied_index_term = 14; + reserved 8, 9, 10; } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index bc3b77e19f6c..2ba3f2b8281b 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -849,8 +850,20 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( ctx context.Context, cmd *replicatedCmd, ) { + // TODO(sumeer): when will raftAppliedIndex be 0? if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 { b.state.RaftAppliedIndex = raftAppliedIndex + rs := cmd.decodedRaftEntry.replicatedResult().State + // We are post migration or this replicatedCmd is doing the migration. + if b.state.RaftAppliedIndexTerm > 0 || (rs != nil && + rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) { + // Once we populate b.state.RaftAppliedIndexTerm it will flow into the + // persisted RangeAppliedState and into the in-memory representation in + // Replica.mu.state. The latter is used to initialize b.state, so future + // calls to this method will see that the migration has already happened + // and will continue to populate the term. + b.state.RaftAppliedIndexTerm = cmd.ent.Term + } } if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 { b.state.LeaseAppliedIndex = leaseAppliedIndex @@ -915,6 +928,8 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // Update the replica's applied indexes, mvcc stats and closed timestamp. r.mu.Lock() r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex + // This will be non-zero only when the migration has happened. + r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex // Sanity check that the RaftClosedTimestamp doesn't go backwards. @@ -978,7 +993,7 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { // lease index along with the mvcc stats, all in one key. loader := &b.r.raftMu.stateLoader return loader.SetRangeAppliedState( - ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, + ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.RaftAppliedIndexTerm, b.state.Stats, &b.state.RaftClosedTimestamp, ) } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..0046457196d1 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -386,6 +386,7 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 { // r.mu is held. Note that the returned snapshot is a placeholder and // does not contain any of the replica data. The snapshot is actually generated // (and sent) by the Raft snapshot queue. +// TODO(sumeer): is this method really used? func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) { r.mu.AssertHeld() appliedIndex := r.mu.state.RaftAppliedIndex @@ -419,6 +420,15 @@ func (r *Replica) GetSnapshot( r.raftMu.Lock() snap := r.store.engine.NewSnapshot() r.mu.Lock() + // We will fetch the appliedIndex again from snap. We require that the + // appliedIndex cached in Replica is consistent with that in the engine. + // + // TODO(sumeer): what ensures the in-memory state is not lagging the engine + // given that replicaAppBatch.ApplyToStateMachine commits the engine batch + // and then acquires Replica.mu to update Replica.mu.state.RaftAppliedIndex? + // Or do we not care whether these two values are consistent given that + // appliedIndex is just serving as an upper bound on what can be truncated + // in the log, so it's completely best-effort? appliedIndex := r.mu.state.RaftAppliedIndex // Cleared when OutgoingSnapshot closes. r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) @@ -568,6 +578,12 @@ func snapshot( } term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex) + // If we've migrated to populating RaftAppliedIndexTerm, check that the term + // from the two sources are equal. + if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm { + return OutgoingSnapshot{}, + errors.Errorf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm) + } if err != nil { return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex) } @@ -920,6 +936,12 @@ func (r *Replica) applySnapshot( log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", state.RaftAppliedIndex, nonemptySnap.Metadata.Index) } + // If we've migrated to populating RaftAppliedIndexTerm, check that the term + // from the two sources are equal. + if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term { + log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d", + state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term) + } // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be @@ -975,6 +997,7 @@ func (r *Replica) applySnapshot( // feelings about this ever change, we can add a LastIndex field to // raftpb.SnapshotMetadata. r.mu.lastIndex = state.RaftAppliedIndex + // TODO(sumeer): why can't this be set to nonemptySnap.Metadata.Term? r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = 0 // Update the store stats for the data in the snapshot. diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..1ee77cac584d 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -29,6 +29,15 @@ import ( const ( raftInitialLogIndex = 10 raftInitialLogTerm = 5 + + // RaftLogTermSignalForAddRaftAppliedIndexTermMigration is never persisted + // in the state machine or in HardState. It is only used in + // AddRaftAppliedIndexTermMigration to signal to the below raft code that + // the migration should happen when applying the raft log entry that + // contains ReplicatedEvalResult.State.RaftAppliedIndexTerm equal to this + // value. It is less than raftInitialLogTerm since that ensures it will + // never be used under normal operation. + RaftLogTermSignalForAddRaftAppliedIndexTermMigration = 3 ) // WriteInitialReplicaState sets up a new Range, but without writing an @@ -46,6 +55,7 @@ func WriteInitialReplicaState( lease roachpb.Lease, gcThreshold hlc.Timestamp, replicaVersion roachpb.Version, + writeRaftAppliedIndexTerm bool, ) (enginepb.MVCCStats, error) { rsl := Make(desc.RangeID) var s kvserverpb.ReplicaState @@ -54,6 +64,9 @@ func WriteInitialReplicaState( Index: raftInitialLogIndex, } s.RaftAppliedIndex = s.TruncatedState.Index + if writeRaftAppliedIndexTerm { + s.RaftAppliedIndexTerm = s.TruncatedState.Term + } s.Desc = &roachpb.RangeDescriptor{ RangeID: desc.RangeID, } @@ -102,9 +115,12 @@ func WriteInitialRangeState( initialGCThreshold := hlc.Timestamp{} initialMS := enginepb.MVCCStats{} + // TODO: is WriteInitialRangeState only used for a new cluster, so + // we can assume that we should set writeAppliedIndexTerm to true? + // What prevents older versions from joining this cluster? if _, err := WriteInitialReplicaState( ctx, readWriter, initialMS, desc, initialLease, initialGCThreshold, - replicaVersion, + replicaVersion, true, /* writeRaftAppliedIndexTerm */ ); err != nil { return err } diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..088eab7c364b 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -78,6 +78,7 @@ func (rsl StateLoader) Load( return kvserverpb.ReplicaState{}, err } s.RaftAppliedIndex = as.RaftAppliedIndex + s.RaftAppliedIndexTerm = as.RaftAppliedIndexTerm s.LeaseAppliedIndex = as.LeaseAppliedIndex ms := as.RangeStats.ToStats() s.Stats = &ms @@ -133,8 +134,9 @@ func (rsl StateLoader) Save( return enginepb.MVCCStats{}, err } } - rai, lai, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, &state.RaftClosedTimestamp - if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, ms, ct); err != nil { + rai, lai, rait, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, state.RaftAppliedIndexTerm, + &state.RaftClosedTimestamp + if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, rait, ms, ct); err != nil { return enginepb.MVCCStats{}, err } return *ms, nil @@ -194,14 +196,15 @@ func (rsl StateLoader) LoadMVCCStats( func (rsl StateLoader) SetRangeAppliedState( ctx context.Context, readWriter storage.ReadWriter, - appliedIndex, leaseAppliedIndex uint64, + appliedIndex, leaseAppliedIndex, appliedIndexTerm uint64, newMS *enginepb.MVCCStats, raftClosedTimestamp *hlc.Timestamp, ) error { as := enginepb.RangeAppliedState{ - RaftAppliedIndex: appliedIndex, - LeaseAppliedIndex: leaseAppliedIndex, - RangeStats: newMS.ToPersistentStats(), + RaftAppliedIndex: appliedIndex, + LeaseAppliedIndex: leaseAppliedIndex, + RangeStats: newMS.ToPersistentStats(), + RaftAppliedIndexTerm: appliedIndexTerm, } if raftClosedTimestamp != nil && !raftClosedTimestamp.IsEmpty() { as.RaftClosedTimestamp = raftClosedTimestamp @@ -223,7 +226,8 @@ func (rsl StateLoader) SetMVCCStats( return err } return rsl.SetRangeAppliedState( - ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, newMS, as.RaftClosedTimestamp) + ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, newMS, + as.RaftClosedTimestamp) } // SetClosedTimestamp overwrites the closed timestamp. @@ -235,7 +239,7 @@ func (rsl StateLoader) SetClosedTimestamp( return err } return rsl.SetRangeAppliedState( - ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, + ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, as.RangeStats.ToStatsPtr(), closedTS) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e5ed6dd91677..022915b13498 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -15,6 +15,7 @@ import ( "io" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" @@ -903,6 +904,11 @@ func SendEmptySnapshot( return err } + // TODO: is SendEmptySnapshot only used for a new cluster, so + // we can assume that we should set writeAppliedIndexTerm to true? + // It doesn't seem right to use a cluster version check since if this + // node's view of the version is stale it can use false. + writeAppliedIndexTerm := st.Version.IsActive(ctx, clusterversion.AddRaftAppliedIndexTermMigration) ms, err = stateloader.WriteInitialReplicaState( ctx, eng, @@ -911,6 +917,7 @@ func SendEmptySnapshot( roachpb.Lease{}, hlc.Timestamp{}, // gcThreshold st.Version.ActiveVersionOrEmpty(ctx).Version, + writeAppliedIndexTerm, ) if err != nil { return err diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index 832180f7dd9e..7459fb420325 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -97,6 +97,16 @@ var migrations = []migration.Migration{ NoPrecondition, grantOptionMigration, ), + migration.NewSystemMigration( + "populate RangeAppliedState.RaftAppliedIndexTerm for all ranges", + toCV(clusterversion.AddRaftAppliedIndexTermMigration), + raftAppliedIndexTermMigration, + ), + migration.NewSystemMigration( + "purge all replicas not populating RangeAppliedState.RaftAppliedIndexTerm", + toCV(clusterversion.PostAddRaftAppliedIndexTermMigration), + postRaftAppliedIndexTermMigration, + ), } func init() { diff --git a/pkg/migration/migrations/raft_applied_index_term.go b/pkg/migration/migrations/raft_applied_index_term.go new file mode 100644 index 000000000000..7b6ea09134b1 --- /dev/null +++ b/pkg/migration/migrations/raft_applied_index_term.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "bytes" + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// defaultPageSize controls how many ranges are paged in by default when +// iterating through all ranges in a cluster during any given migration. We +// pulled this number out of thin air(-ish). Let's consider a cluster with 50k +// ranges, with each range taking ~200ms. We're being somewhat conservative with +// the duration, but in a wide-area cluster with large hops between the manager +// and the replicas, it could be true. Here's how long it'll take for various +// block sizes: +// +// page size of 1 ~ 2h 46m +// page size of 50 ~ 3m 20s +// page size of 200 ~ 50s +const defaultPageSize = 200 + +func raftAppliedIndexTermMigration( + ctx context.Context, cv clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + var batchIdx, numMigratedRanges int + init := func() { batchIdx, numMigratedRanges = 1, 0 } + if err := deps.Cluster.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // NB: This is a bit of a wart. We want to reach the first range, + // but we can't address the (local) StartKey. However, keys.LocalMax + // is on r1, so we'll just use that instead to target r1. + start, end := desc.StartKey, desc.EndKey + if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 { + start, _ = keys.Addr(keys.LocalMax) + } + if err := deps.DB.Migrate(ctx, start, end, cv.Version); err != nil { + return err + } + } + + // TODO(irfansharif): Instead of logging this to the debug log, we + // should insert these into a `system.migrations` table for external + // observability. + numMigratedRanges += len(descriptors) + log.Infof(ctx, "[batch %d/??] migrated %d ranges", batchIdx, numMigratedRanges) + batchIdx++ + + return nil + }); err != nil { + return err + } + + log.Infof(ctx, "[batch %d/%d] migrated %d ranges", batchIdx, batchIdx, numMigratedRanges) + + // Make sure that all stores have synced. Given we're a below-raft + // migrations, this ensures that the applied state is flushed to disk. + req := &serverpb.SyncAllEnginesRequest{} + op := "flush-stores" + return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.SyncAllEngines(ctx, req) + return err + }) +} + +func postRaftAppliedIndexTermMigration( + ctx context.Context, cv clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + // TODO(sumeer): this is copied from postTruncatedStateMigration. In + // comparison, postSeparatedIntentsMigration iterated over ranges and issues + // a noop below-raft migration. I am not clear on why there is a difference. + // Get this clarified. + + // Purge all replicas that haven't been migrated to use the unreplicated + // truncated state and the range applied state. + truncStateVersion := clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration) + req := &serverpb.PurgeOutdatedReplicasRequest{Version: &truncStateVersion} + op := fmt.Sprintf("purge-outdated-replicas=%s", req.Version) + return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.PurgeOutdatedReplicas(ctx, req) + return err + }) +} diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index 8cddd3dff4a6..49e9955e7e8a 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -224,6 +224,14 @@ message RangeAppliedState { // migration). In 21.1 we cannot write empty timestamp to disk because that // looks like an inconsistency to the consistency-checker. util.hlc.Timestamp raft_closed_timestamp = 4; + + // raft_applied_index_term is the term corresponding to raft_applied_index. + // The serialized proto will not contain this field until code starts + // setting it to a value > 0. This is desirable since we don't want a mixed + // version cluster to have divergent replica state simply because we have + // introduced this field. An explicit migration will cause this field to + // start being populated. + uint64 raft_applied_index_term = 5; } // MVCCWriteValueOp corresponds to a value being written outside of a