Skip to content

Commit

Permalink
[WIP] kvserver: introduce RangeAppliedState.RaftAppliedIndexTerm
Browse files Browse the repository at this point in the history
The same field is also introduced in ReplicaState, since ReplicaState
is used in internal data-structures and when sending a state machine
snapshot.

The migration code uses a special unused term value in a
ReplicatedEvalResult to signal to the state machine application
machinery to start populating the term field.

Fixes cockroachdb#75671

Release note: None
  • Loading branch information
sumeerbhola committed Jan 28, 2022
1 parent af4b5db commit 7e77aa3
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-50 set the active cluster version in the format '<major>.<minor>'
version version 21.2-54 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-50</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-54</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
14 changes: 14 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ const (
// EnableProtectedTimestampsForTenant enables the use of protected timestamps
// in secondary tenants.
EnableProtectedTimestampsForTenant
// AddRaftAppliedIndexTermMigration is a migration that causes each range
// replica to start populating RangeAppliedState.RaftAppliedIndexTerm field.
AddRaftAppliedIndexTermMigration
// PostAddRaftAppliedIndexTermMigration is used for asserting that
// RaftAppliedIndexTerm is populated.
PostAddRaftAppliedIndexTermMigration

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -387,6 +393,14 @@ var versionsSingleton = keyedVersions{
Key: EnableProtectedTimestampsForTenant,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50},
},
{
Key: AddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52},
},
{
Key: PostAddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,19 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version")
}
// The RHS should populate RaftAppliedIndexTerm if the LHS is doing so.
rangeAppliedState, err := sl.LoadRangeAppliedState(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{},
errors.Wrap(err, "unable to load range applied state")
}
writeRaftAppliedIndexTerm := false
if rangeAppliedState.RaftAppliedIndexTerm > 0 {
writeRaftAppliedIndexTerm = true
}
*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, replicaVersion,
*gcThreshold, replicaVersion, writeRaftAppliedIndexTerm,
)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -49,6 +50,8 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re

func init() {
_ = registerMigration // prevent unused warning.
registerMigration(
clusterversion.AddRaftAppliedIndexTermMigration, addRaftAppliedIndexTermMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
Expand Down Expand Up @@ -89,6 +92,23 @@ func Migrate(
return pd, nil
}

// addRaftAppliedIndexTermMigration migrates the system to start populating
// the RangeAppliedState.RaftAppliedIndexTerm field.
func addRaftAppliedIndexTermMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
State: &kvserverpb.ReplicaState{
// NB: This is not a real term value. We can not know the term at
// proposal time since we don't know when this will be added as a raft
// log entry.
RaftAppliedIndexTerm: stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration,
},
},
}, nil
}

// TestingRegisterMigrationInterceptor is used in tests to register an
// interceptor for a below-raft migration.
//
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,13 @@ func coalesceBool(lhs *bool, rhs *bool) {
func (p *Result) MergeAndDestroy(q Result) error {
if q.Replicated.State != nil {
if q.Replicated.State.RaftAppliedIndex != 0 {
return errors.AssertionFailedf("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify RaftAppliedIndex")
}
if q.Replicated.State.RaftAppliedIndexTerm != 0 {
return errors.AssertionFailedf("must not specify RaftAppliedIndexTerm")
}
if q.Replicated.State.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify LeaseAppliedIndex")
}
if p.Replicated.State == nil {
p.Replicated.State = &kvserverpb.ReplicaState{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvserverpb/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ message ReplicaState {
// "follower reads" at or below this timestamp.
util.hlc.Timestamp raft_closed_timestamp = 13 [(gogoproto.nullable) = false];

// The term corresponding to RaftAppliedIndex. This is derived from
// RangeAppliedState.RaftAppliedIndexTerm.
uint64 raft_applied_index_term = 14;

reserved 8, 9, 10;
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -849,8 +850,20 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
ctx context.Context, cmd *replicatedCmd,
) {
// TODO(sumeer): when will raftAppliedIndex be 0?
if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 {
b.state.RaftAppliedIndex = raftAppliedIndex
rs := cmd.decodedRaftEntry.replicatedResult().State
// We are post migration or this replicatedCmd is doing the migration.
if b.state.RaftAppliedIndexTerm > 0 || (rs != nil &&
rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) {
// Once we populate b.state.RaftAppliedIndexTerm it will flow into the
// persisted RangeAppliedState and into the in-memory representation in
// Replica.mu.state. The latter is used to initialize b.state, so future
// calls to this method will see that the migration has already happened
// and will continue to populate the term.
b.state.RaftAppliedIndexTerm = cmd.ent.Term
}
}
if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
Expand Down Expand Up @@ -915,6 +928,8 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Update the replica's applied indexes, mvcc stats and closed timestamp.
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
// This will be non-zero only when the migration has happened.
r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm
r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex

// Sanity check that the RaftClosedTimestamp doesn't go backwards.
Expand Down Expand Up @@ -978,7 +993,7 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error {
// lease index along with the mvcc stats, all in one key.
loader := &b.r.raftMu.stateLoader
return loader.SetRangeAppliedState(
ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex,
ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.RaftAppliedIndexTerm,
b.state.Stats, &b.state.RaftClosedTimestamp,
)
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 {
// r.mu is held. Note that the returned snapshot is a placeholder and
// does not contain any of the replica data. The snapshot is actually generated
// (and sent) by the Raft snapshot queue.
// TODO(sumeer): is this method really used?
func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) {
r.mu.AssertHeld()
appliedIndex := r.mu.state.RaftAppliedIndex
Expand Down Expand Up @@ -419,6 +420,15 @@ func (r *Replica) GetSnapshot(
r.raftMu.Lock()
snap := r.store.engine.NewSnapshot()
r.mu.Lock()
// We will fetch the appliedIndex again from snap. We require that the
// appliedIndex cached in Replica is consistent with that in the engine.
//
// TODO(sumeer): what ensures the in-memory state is not lagging the engine
// given that replicaAppBatch.ApplyToStateMachine commits the engine batch
// and then acquires Replica.mu to update Replica.mu.state.RaftAppliedIndex?
// Or do we not care whether these two values are consistent given that
// appliedIndex is just serving as an upper bound on what can be truncated
// in the log, so it's completely best-effort?
appliedIndex := r.mu.state.RaftAppliedIndex
// Cleared when OutgoingSnapshot closes.
r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore)
Expand Down Expand Up @@ -568,6 +578,12 @@ func snapshot(
}

term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex)
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm {
return OutgoingSnapshot{},
errors.Errorf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm)
}
if err != nil {
return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex)
}
Expand Down Expand Up @@ -920,6 +936,12 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d",
state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term)
}

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
Expand Down Expand Up @@ -975,6 +997,7 @@ func (r *Replica) applySnapshot(
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
r.mu.lastIndex = state.RaftAppliedIndex
// TODO(sumeer): why can't this be set to nonemptySnap.Metadata.Term?
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ import (
const (
raftInitialLogIndex = 10
raftInitialLogTerm = 5

// RaftLogTermSignalForAddRaftAppliedIndexTermMigration is never persisted
// in the state machine or in HardState. It is only used in
// AddRaftAppliedIndexTermMigration to signal to the below raft code that
// the migration should happen when applying the raft log entry that
// contains ReplicatedEvalResult.State.RaftAppliedIndexTerm equal to this
// value. It is less than raftInitialLogTerm since that ensures it will
// never be used under normal operation.
RaftLogTermSignalForAddRaftAppliedIndexTermMigration = 3
)

// WriteInitialReplicaState sets up a new Range, but without writing an
Expand All @@ -46,6 +55,7 @@ func WriteInitialReplicaState(
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
replicaVersion roachpb.Version,
writeRaftAppliedIndexTerm bool,
) (enginepb.MVCCStats, error) {
rsl := Make(desc.RangeID)
var s kvserverpb.ReplicaState
Expand All @@ -54,6 +64,9 @@ func WriteInitialReplicaState(
Index: raftInitialLogIndex,
}
s.RaftAppliedIndex = s.TruncatedState.Index
if writeRaftAppliedIndexTerm {
s.RaftAppliedIndexTerm = s.TruncatedState.Term
}
s.Desc = &roachpb.RangeDescriptor{
RangeID: desc.RangeID,
}
Expand Down Expand Up @@ -102,9 +115,12 @@ func WriteInitialRangeState(
initialGCThreshold := hlc.Timestamp{}
initialMS := enginepb.MVCCStats{}

// TODO: is WriteInitialRangeState only used for a new cluster, so
// we can assume that we should set writeAppliedIndexTerm to true?
// What prevents older versions from joining this cluster?
if _, err := WriteInitialReplicaState(
ctx, readWriter, initialMS, desc, initialLease, initialGCThreshold,
replicaVersion,
replicaVersion, true, /* writeRaftAppliedIndexTerm */
); err != nil {
return err
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (rsl StateLoader) Load(
return kvserverpb.ReplicaState{}, err
}
s.RaftAppliedIndex = as.RaftAppliedIndex
s.RaftAppliedIndexTerm = as.RaftAppliedIndexTerm
s.LeaseAppliedIndex = as.LeaseAppliedIndex
ms := as.RangeStats.ToStats()
s.Stats = &ms
Expand Down Expand Up @@ -133,8 +134,9 @@ func (rsl StateLoader) Save(
return enginepb.MVCCStats{}, err
}
}
rai, lai, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, &state.RaftClosedTimestamp
if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, ms, ct); err != nil {
rai, lai, rait, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, state.RaftAppliedIndexTerm,
&state.RaftClosedTimestamp
if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, rait, ms, ct); err != nil {
return enginepb.MVCCStats{}, err
}
return *ms, nil
Expand Down Expand Up @@ -194,14 +196,15 @@ func (rsl StateLoader) LoadMVCCStats(
func (rsl StateLoader) SetRangeAppliedState(
ctx context.Context,
readWriter storage.ReadWriter,
appliedIndex, leaseAppliedIndex uint64,
appliedIndex, leaseAppliedIndex, appliedIndexTerm uint64,
newMS *enginepb.MVCCStats,
raftClosedTimestamp *hlc.Timestamp,
) error {
as := enginepb.RangeAppliedState{
RaftAppliedIndex: appliedIndex,
LeaseAppliedIndex: leaseAppliedIndex,
RangeStats: newMS.ToPersistentStats(),
RaftAppliedIndex: appliedIndex,
LeaseAppliedIndex: leaseAppliedIndex,
RangeStats: newMS.ToPersistentStats(),
RaftAppliedIndexTerm: appliedIndexTerm,
}
if raftClosedTimestamp != nil && !raftClosedTimestamp.IsEmpty() {
as.RaftClosedTimestamp = raftClosedTimestamp
Expand All @@ -223,7 +226,8 @@ func (rsl StateLoader) SetMVCCStats(
return err
}
return rsl.SetRangeAppliedState(
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, newMS, as.RaftClosedTimestamp)
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, newMS,
as.RaftClosedTimestamp)
}

// SetClosedTimestamp overwrites the closed timestamp.
Expand All @@ -235,7 +239,7 @@ func (rsl StateLoader) SetClosedTimestamp(
return err
}
return rsl.SetRangeAppliedState(
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex,
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm,
as.RangeStats.ToStatsPtr(), closedTS)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -903,6 +904,11 @@ func SendEmptySnapshot(
return err
}

// TODO: is SendEmptySnapshot only used for a new cluster, so
// we can assume that we should set writeAppliedIndexTerm to true?
// It doesn't seem right to use a cluster version check since if this
// node's view of the version is stale it can use false.
writeAppliedIndexTerm := st.Version.IsActive(ctx, clusterversion.AddRaftAppliedIndexTermMigration)
ms, err = stateloader.WriteInitialReplicaState(
ctx,
eng,
Expand All @@ -911,6 +917,7 @@ func SendEmptySnapshot(
roachpb.Lease{},
hlc.Timestamp{}, // gcThreshold
st.Version.ActiveVersionOrEmpty(ctx).Version,
writeAppliedIndexTerm,
)
if err != nil {
return err
Expand Down
10 changes: 10 additions & 0 deletions pkg/migration/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ var migrations = []migration.Migration{
NoPrecondition,
grantOptionMigration,
),
migration.NewSystemMigration(
"populate RangeAppliedState.RaftAppliedIndexTerm for all ranges",
toCV(clusterversion.AddRaftAppliedIndexTermMigration),
raftAppliedIndexTermMigration,
),
migration.NewSystemMigration(
"purge all replicas not populating RangeAppliedState.RaftAppliedIndexTerm",
toCV(clusterversion.PostAddRaftAppliedIndexTermMigration),
postRaftAppliedIndexTermMigration,
),
}

func init() {
Expand Down
Loading

0 comments on commit 7e77aa3

Please sign in to comment.