Skip to content

Commit

Permalink
kvserver: introduce RangeAppliedState.RaftAppliedIndexTerm
Browse files Browse the repository at this point in the history
The same field is also introduced in ReplicaState, since ReplicaState
is used in internal data-structures and when sending a state machine
snapshot.

The migration code uses a special unused term value in a
ReplicatedEvalResult to signal to the state machine application
machinery to start populating the term field.

Fixes #75671

Release note: None
  • Loading branch information
sumeerbhola committed Feb 2, 2022
1 parent 7bd7ec0 commit 22f3f04
Show file tree
Hide file tree
Showing 21 changed files with 564 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-50 set the active cluster version in the format '<major>.<minor>'
version version 21.2-54 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-50</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-54</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
14 changes: 14 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ const (
// EnableProtectedTimestampsForTenant enables the use of protected timestamps
// in secondary tenants.
EnableProtectedTimestampsForTenant
// AddRaftAppliedIndexTermMigration is a migration that causes each range
// replica to start populating RangeAppliedState.RaftAppliedIndexTerm field.
AddRaftAppliedIndexTermMigration
// PostAddRaftAppliedIndexTermMigration is used for asserting that
// RaftAppliedIndexTerm is populated.
PostAddRaftAppliedIndexTermMigration

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -387,6 +393,14 @@ var versionsSingleton = keyedVersions{
Key: EnableProtectedTimestampsForTenant,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50},
},
{
Key: AddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52},
},
{
Key: PostAddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
16 changes: 15 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,23 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version")
}
// The RHS should populate RaftAppliedIndexTerm if the LHS is doing so.
// Alternatively, we could be more aggressive and also look at the cluster
// version, but this is simpler -- if the keyspace occupied by the
// original unsplit range has not been migrated yet, by an ongoing
// migration, both LHS and RHS will be migrated later.
rangeAppliedState, err := sl.LoadRangeAppliedState(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{},
errors.Wrap(err, "unable to load range applied state")
}
writeRaftAppliedIndexTerm := false
if rangeAppliedState.RaftAppliedIndexTerm > 0 {
writeRaftAppliedIndexTerm = true
}
*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, replicaVersion,
*gcThreshold, replicaVersion, writeRaftAppliedIndexTerm,
)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -49,6 +50,8 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re

func init() {
_ = registerMigration // prevent unused warning.
registerMigration(
clusterversion.AddRaftAppliedIndexTermMigration, addRaftAppliedIndexTermMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
Expand Down Expand Up @@ -89,6 +92,25 @@ func Migrate(
return pd, nil
}

// addRaftAppliedIndexTermMigration migrates the system to start populating
// the RangeAppliedState.RaftAppliedIndexTerm field.
func addRaftAppliedIndexTermMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
State: &kvserverpb.ReplicaState{
// Signal the migration by sending a term on the new field that we
// want to migrate into. This term is chosen as one that would never
// be used in practice (since raftInitialLogTerm is 10), so we can
// special-case it below raft and start writing the (real) term to the
// AppliedState.
RaftAppliedIndexTerm: stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration,
},
},
}, nil
}

// TestingRegisterMigrationInterceptor is used in tests to register an
// interceptor for a below-raft migration.
//
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/result/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb:with-mocks",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
Expand Down
21 changes: 19 additions & 2 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -193,14 +194,30 @@ func coalesceBool(lhs *bool, rhs *bool) {
func (p *Result) MergeAndDestroy(q Result) error {
if q.Replicated.State != nil {
if q.Replicated.State.RaftAppliedIndex != 0 {
return errors.AssertionFailedf("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify RaftAppliedIndex")
}
if q.Replicated.State.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify LeaseAppliedIndex")
}
if p.Replicated.State == nil {
p.Replicated.State = &kvserverpb.ReplicaState{}
}
if q.Replicated.State.RaftAppliedIndexTerm != 0 {
if q.Replicated.State.RaftAppliedIndexTerm ==
stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration {
if p.Replicated.State.RaftAppliedIndexTerm != 0 &&
p.Replicated.State.RaftAppliedIndexTerm !=
stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration {
return errors.AssertionFailedf("invalid term value %d",
p.Replicated.State.RaftAppliedIndexTerm)
}
p.Replicated.State.RaftAppliedIndexTerm = q.Replicated.State.RaftAppliedIndexTerm
q.Replicated.State.RaftAppliedIndexTerm = 0
} else {
return errors.AssertionFailedf("invalid term value %d",
q.Replicated.State.RaftAppliedIndexTerm)
}
}
if p.Replicated.State.Desc == nil {
p.Replicated.State.Desc = q.Replicated.State.Desc
} else if q.Replicated.State.Desc != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return enginepb.NewPopulatedRangeAppliedState(r, false)
},
emptySum: 615555020845646359,
populatedSum: 17354515720541950025,
populatedSum: 12125419916111069931,
},
reflect.TypeOf(&raftpb.HardState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvserverpb/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ message ReplicaState {
// "follower reads" at or below this timestamp.
util.hlc.Timestamp raft_closed_timestamp = 13 [(gogoproto.nullable) = false];

// The term corresponding to RaftAppliedIndex. This is derived from
// RangeAppliedState.RaftAppliedIndexTerm.
uint64 raft_applied_index_term = 14;

reserved 8, 9, 10;
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult)
// replica state for this batch.
if haveState := r.State != nil; haveState {
r.State.Stats = nil
// Reset the signal used to execute the AddRaftAppliedIndexTermMigration.
r.State.RaftAppliedIndexTerm = 0
if *r.State == (kvserverpb.ReplicaState{}) {
r.State = nil
}
Expand Down
24 changes: 21 additions & 3 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -849,9 +850,23 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
ctx context.Context, cmd *replicatedCmd,
) {
if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 {
b.state.RaftAppliedIndex = raftAppliedIndex
raftAppliedIndex := cmd.ent.Index
if raftAppliedIndex == 0 {
log.Fatalf(ctx, "raft entry with index 0")
}
b.state.RaftAppliedIndex = raftAppliedIndex
rs := cmd.decodedRaftEntry.replicatedResult().State
// We are post migration or this replicatedCmd is doing the migration.
if b.state.RaftAppliedIndexTerm > 0 || (rs != nil &&
rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) {
// Once we populate b.state.RaftAppliedIndexTerm it will flow into the
// persisted RangeAppliedState and into the in-memory representation in
// Replica.mu.state. The latter is used to initialize b.state, so future
// calls to this method will see that the migration has already happened
// and will continue to populate the term.
b.state.RaftAppliedIndexTerm = cmd.ent.Term
}

if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
Expand Down Expand Up @@ -915,6 +930,9 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Update the replica's applied indexes, mvcc stats and closed timestamp.
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
// RaftAppliedIndexTerm will be non-zero only when the
// AddRaftAppliedIndexTermMigration has happened.
r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm
r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex

// Sanity check that the RaftClosedTimestamp doesn't go backwards.
Expand Down Expand Up @@ -978,7 +996,7 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error {
// lease index along with the mvcc stats, all in one key.
loader := &b.r.raftMu.stateLoader
return loader.SetRangeAppliedState(
ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex,
ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.RaftAppliedIndexTerm,
b.state.Stats, &b.state.RaftClosedTimestamp,
)
}
Expand Down
52 changes: 47 additions & 5 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,15 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 {
// r.mu is held. Note that the returned snapshot is a placeholder and
// does not contain any of the replica data. The snapshot is actually generated
// (and sent) by the Raft snapshot queue.
//
// More specifically, this method is called by etcd/raft in
// (*raftLog).snapshot. Raft expects that it generates the snapshot (by
// calling Snapshot) and that "sending" the result actually sends the
// snapshot. In CockroachDB, that message is intercepted (at the sender) and
// instead we add the replica (the raft leader) to the raft snapshot queue,
// and when its turn comes we look at the raft state for followers that want a
// snapshot, and then send one. That actual sending path does not call this
// Snapshot method.
func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) {
r.mu.AssertHeld()
appliedIndex := r.mu.state.RaftAppliedIndex
Expand Down Expand Up @@ -418,11 +427,27 @@ func (r *Replica) GetSnapshot(
// the corresponding Raft command not applied yet).
r.raftMu.Lock()
snap := r.store.engine.NewSnapshot()
r.mu.Lock()
appliedIndex := r.mu.state.RaftAppliedIndex
// Cleared when OutgoingSnapshot closes.
r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore)
r.mu.Unlock()
{
r.mu.Lock()
// We will fetch the applied index later again, from snap. The
// appliedIndex fetched here is narrowly used for adding a log truncation
// constraint to prevent log entries > appliedIndex from being removed.
// Note that the appliedIndex maintained in Replica actually lags the one
// in the engine, since replicaAppBatch.ApplyToStateMachine commits the
// engine batch and then acquires Replica.mu to update
// Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value
// here is harmless since using a lower index in this constraint, than the
// actual snapshot index, preserves more from a log truncation
// perspective.
//
// TODO(sumeer): despite the above justification, this is unnecessarily
// complicated. Consider loading the RaftAppliedIndex from the snap for
// this use case.
appliedIndex := r.mu.state.RaftAppliedIndex
// Cleared when OutgoingSnapshot closes.
r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore)
r.mu.Unlock()
}
r.raftMu.Unlock()

release := func() {
Expand Down Expand Up @@ -568,6 +593,12 @@ func snapshot(
}

term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex)
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm {
return OutgoingSnapshot{},
errors.AssertionFailedf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm)
}
if err != nil {
return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex)
}
Expand Down Expand Up @@ -920,6 +951,12 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d",
state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term)
}

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
Expand Down Expand Up @@ -975,6 +1012,11 @@ func (r *Replica) applySnapshot(
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
r.mu.lastIndex = state.RaftAppliedIndex

// TODO(sumeer): We should be able to set this to
// nonemptySnap.Metadata.Term. See
// https://github.com/cockroachdb/cockroach/pull/75675#pullrequestreview-867926687
// for a discussion regarding this.
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/stateloader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb:with-mocks",
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package stateloader
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -29,6 +30,15 @@ import (
const (
raftInitialLogIndex = 10
raftInitialLogTerm = 5

// RaftLogTermSignalForAddRaftAppliedIndexTermMigration is never persisted
// in the state machine or in HardState. It is only used in
// AddRaftAppliedIndexTermMigration to signal to the below raft code that
// the migration should happen when applying the raft log entry that
// contains ReplicatedEvalResult.State.RaftAppliedIndexTerm equal to this
// value. It is less than raftInitialLogTerm since that ensures it will
// never be used under normal operation.
RaftLogTermSignalForAddRaftAppliedIndexTermMigration = 3
)

// WriteInitialReplicaState sets up a new Range, but without writing an
Expand All @@ -46,6 +56,7 @@ func WriteInitialReplicaState(
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
replicaVersion roachpb.Version,
writeRaftAppliedIndexTerm bool,
) (enginepb.MVCCStats, error) {
rsl := Make(desc.RangeID)
var s kvserverpb.ReplicaState
Expand All @@ -54,6 +65,9 @@ func WriteInitialReplicaState(
Index: raftInitialLogIndex,
}
s.RaftAppliedIndex = s.TruncatedState.Index
if writeRaftAppliedIndexTerm {
s.RaftAppliedIndexTerm = s.TruncatedState.Term
}
s.Desc = &roachpb.RangeDescriptor{
RangeID: desc.RangeID,
}
Expand Down Expand Up @@ -102,9 +116,12 @@ func WriteInitialRangeState(
initialGCThreshold := hlc.Timestamp{}
initialMS := enginepb.MVCCStats{}

writeRaftAppliedIndexTerm :=
clusterversion.ClusterVersion{Version: replicaVersion}.IsActiveVersion(
clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration))
if _, err := WriteInitialReplicaState(
ctx, readWriter, initialMS, desc, initialLease, initialGCThreshold,
replicaVersion,
replicaVersion, writeRaftAppliedIndexTerm,
); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 22f3f04

Please sign in to comment.