Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: introduce RangeAppliedState.RaftAppliedIndexTerm #75675

Merged
merged 1 commit into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-54 set the active cluster version in the format '<major>.<minor>'
version version 21.2-58 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-54</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-58</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
16 changes: 16 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ const (
// RemoveIncompatibleDatabasePrivileges adds the migration which guarantees that
// databases do not have incompatible privileges
RemoveIncompatibleDatabasePrivileges
// AddRaftAppliedIndexTermMigration is a migration that causes each range
// replica to start populating RangeAppliedState.RaftAppliedIndexTerm field.
AddRaftAppliedIndexTermMigration
// PostAddRaftAppliedIndexTermMigration is used for asserting that
// RaftAppliedIndexTerm is populated.
PostAddRaftAppliedIndexTermMigration

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -401,6 +408,15 @@ var versionsSingleton = keyedVersions{
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54},
},

{
Key: AddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 56},
},
{
Key: PostAddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 58},
},

// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,23 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version")
}
// The RHS should populate RaftAppliedIndexTerm if the LHS is doing so.
// Alternatively, we could be more aggressive and also look at the cluster
// version, but this is simpler -- if the keyspace occupied by the
// original unsplit range has not been migrated yet, by an ongoing
// migration, both LHS and RHS will be migrated later.
rangeAppliedState, err := sl.LoadRangeAppliedState(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{},
errors.Wrap(err, "unable to load range applied state")
}
writeRaftAppliedIndexTerm := false
if rangeAppliedState.RaftAppliedIndexTerm > 0 {
writeRaftAppliedIndexTerm = true
}
*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, replicaVersion,
*gcThreshold, replicaVersion, writeRaftAppliedIndexTerm,
)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -49,6 +50,8 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re

func init() {
_ = registerMigration // prevent unused warning.
registerMigration(
clusterversion.AddRaftAppliedIndexTermMigration, addRaftAppliedIndexTermMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
Expand Down Expand Up @@ -89,6 +92,25 @@ func Migrate(
return pd, nil
}

// addRaftAppliedIndexTermMigration migrates the system to start populating
// the RangeAppliedState.RaftAppliedIndexTerm field.
func addRaftAppliedIndexTermMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
State: &kvserverpb.ReplicaState{
// Signal the migration by sending a term on the new field that we
// want to migrate into. This term is chosen as one that would never
// be used in practice (since raftInitialLogTerm is 10), so we can
// special-case it below raft and start writing the (real) term to the
// AppliedState.
RaftAppliedIndexTerm: stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration,
},
},
}, nil
}

// TestingRegisterMigrationInterceptor is used in tests to register an
// interceptor for a below-raft migration.
//
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/result/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb:with-mocks",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
Expand Down
21 changes: 19 additions & 2 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -193,14 +194,30 @@ func coalesceBool(lhs *bool, rhs *bool) {
func (p *Result) MergeAndDestroy(q Result) error {
if q.Replicated.State != nil {
if q.Replicated.State.RaftAppliedIndex != 0 {
return errors.AssertionFailedf("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify RaftAppliedIndex")
}
if q.Replicated.State.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify LeaseAppliedIndex")
}
if p.Replicated.State == nil {
p.Replicated.State = &kvserverpb.ReplicaState{}
}
if q.Replicated.State.RaftAppliedIndexTerm != 0 {
if q.Replicated.State.RaftAppliedIndexTerm ==
stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration {
if p.Replicated.State.RaftAppliedIndexTerm != 0 &&
p.Replicated.State.RaftAppliedIndexTerm !=
stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration {
return errors.AssertionFailedf("invalid term value %d",
p.Replicated.State.RaftAppliedIndexTerm)
}
p.Replicated.State.RaftAppliedIndexTerm = q.Replicated.State.RaftAppliedIndexTerm
q.Replicated.State.RaftAppliedIndexTerm = 0
} else {
return errors.AssertionFailedf("invalid term value %d",
q.Replicated.State.RaftAppliedIndexTerm)
}
}
if p.Replicated.State.Desc == nil {
p.Replicated.State.Desc = q.Replicated.State.Desc
} else if q.Replicated.State.Desc != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return enginepb.NewPopulatedRangeAppliedState(r, false)
},
emptySum: 615555020845646359,
populatedSum: 17354515720541950025,
populatedSum: 12125419916111069931,
},
reflect.TypeOf(&raftpb.HardState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvserverpb/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ message ReplicaState {
// "follower reads" at or below this timestamp.
util.hlc.Timestamp raft_closed_timestamp = 13 [(gogoproto.nullable) = false];

// The term corresponding to RaftAppliedIndex. This is derived from
// RangeAppliedState.RaftAppliedIndexTerm.
uint64 raft_applied_index_term = 14;

reserved 8, 9, 10;
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,3 +2002,14 @@ func (r *Replica) GetResponseMemoryAccount() *mon.BoundAccount {
func init() {
tracing.RegisterTagRemapping("r", "range")
}

// LockRaftMuForTesting is for use only by tests in other packages that are
// trying to avoid a race with concurrent raft application. For tests in the
// kvserver package, use Replica.{RaftLock,RaftUnlock} that are defined in
// helpers_test.go.
func (r *Replica) LockRaftMuForTesting() (unlockFunc func()) {
r.raftMu.Lock()
return func() {
r.raftMu.Unlock()
}
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult)
// replica state for this batch.
if haveState := r.State != nil; haveState {
r.State.Stats = nil
// Reset the signal used to execute the AddRaftAppliedIndexTermMigration.
r.State.RaftAppliedIndexTerm = 0
if *r.State == (kvserverpb.ReplicaState{}) {
r.State = nil
}
Expand Down
24 changes: 21 additions & 3 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -849,9 +850,23 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
ctx context.Context, cmd *replicatedCmd,
) {
if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 {
b.state.RaftAppliedIndex = raftAppliedIndex
raftAppliedIndex := cmd.ent.Index
if raftAppliedIndex == 0 {
log.Fatalf(ctx, "raft entry with index 0")
}
b.state.RaftAppliedIndex = raftAppliedIndex
rs := cmd.decodedRaftEntry.replicatedResult().State
// We are post migration or this replicatedCmd is doing the migration.
if b.state.RaftAppliedIndexTerm > 0 || (rs != nil &&
rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) {
// Once we populate b.state.RaftAppliedIndexTerm it will flow into the
// persisted RangeAppliedState and into the in-memory representation in
// Replica.mu.state. The latter is used to initialize b.state, so future
// calls to this method will see that the migration has already happened
// and will continue to populate the term.
b.state.RaftAppliedIndexTerm = cmd.ent.Term
}

if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
Expand Down Expand Up @@ -915,6 +930,9 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Update the replica's applied indexes, mvcc stats and closed timestamp.
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
// RaftAppliedIndexTerm will be non-zero only when the
// AddRaftAppliedIndexTermMigration has happened.
r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm
r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex

// Sanity check that the RaftClosedTimestamp doesn't go backwards.
Expand Down Expand Up @@ -978,7 +996,7 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error {
// lease index along with the mvcc stats, all in one key.
loader := &b.r.raftMu.stateLoader
return loader.SetRangeAppliedState(
ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex,
ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.RaftAppliedIndexTerm,
b.state.Stats, &b.state.RaftClosedTimestamp,
)
}
Expand Down
52 changes: 47 additions & 5 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,15 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 {
// r.mu is held. Note that the returned snapshot is a placeholder and
// does not contain any of the replica data. The snapshot is actually generated
// (and sent) by the Raft snapshot queue.
//
// More specifically, this method is called by etcd/raft in
// (*raftLog).snapshot. Raft expects that it generates the snapshot (by
// calling Snapshot) and that "sending" the result actually sends the
// snapshot. In CockroachDB, that message is intercepted (at the sender) and
// instead we add the replica (the raft leader) to the raft snapshot queue,
// and when its turn comes we look at the raft state for followers that want a
// snapshot, and then send one. That actual sending path does not call this
// Snapshot method.
func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) {
r.mu.AssertHeld()
appliedIndex := r.mu.state.RaftAppliedIndex
Expand Down Expand Up @@ -418,11 +427,27 @@ func (r *Replica) GetSnapshot(
// the corresponding Raft command not applied yet).
r.raftMu.Lock()
snap := r.store.engine.NewSnapshot()
r.mu.Lock()
appliedIndex := r.mu.state.RaftAppliedIndex
// Cleared when OutgoingSnapshot closes.
r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore)
r.mu.Unlock()
{
r.mu.Lock()
// We will fetch the applied index later again, from snap. The
// appliedIndex fetched here is narrowly used for adding a log truncation
// constraint to prevent log entries > appliedIndex from being removed.
// Note that the appliedIndex maintained in Replica actually lags the one
// in the engine, since replicaAppBatch.ApplyToStateMachine commits the
// engine batch and then acquires Replica.mu to update
// Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value
// here is harmless since using a lower index in this constraint, than the
// actual snapshot index, preserves more from a log truncation
// perspective.
//
// TODO(sumeer): despite the above justification, this is unnecessarily
// complicated. Consider loading the RaftAppliedIndex from the snap for
// this use case.
appliedIndex := r.mu.state.RaftAppliedIndex
// Cleared when OutgoingSnapshot closes.
r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore)
r.mu.Unlock()
}
r.raftMu.Unlock()

release := func() {
Expand Down Expand Up @@ -568,6 +593,12 @@ func snapshot(
}

term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex)
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm {
return OutgoingSnapshot{},
errors.AssertionFailedf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm)
}
if err != nil {
return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex)
}
Expand Down Expand Up @@ -920,6 +951,12 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d",
state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term)
}

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
Expand Down Expand Up @@ -975,6 +1012,11 @@ func (r *Replica) applySnapshot(
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
r.mu.lastIndex = state.RaftAppliedIndex

// TODO(sumeer): We should be able to set this to
// nonemptySnap.Metadata.Term. See
// https://github.com/cockroachdb/cockroach/pull/75675#pullrequestreview-867926687
// for a discussion regarding this.
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/stateloader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb:with-mocks",
Expand Down
Loading