diff --git a/pkg/kv/kvserver/kvstorage/BUILD.bazel b/pkg/kv/kvserver/kvstorage/BUILD.bazel index 110f33cf9416..45084528bd1c 100644 --- a/pkg/kv/kvserver/kvstorage/BUILD.bazel +++ b/pkg/kv/kvserver/kvstorage/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = [ "doc.go", "init.go", + "snapshot.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage", visibility = ["//visibility:public"], @@ -20,6 +21,8 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "@com_github_cockroachdb_errors//:errors", + "@io_etcd_go_raft_v3//:raft", + "@io_etcd_go_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/kvstorage/snapshot.go b/pkg/kv/kvserver/kvstorage/snapshot.go new file mode 100644 index 000000000000..0726f293be1b --- /dev/null +++ b/pkg/kv/kvserver/kvstorage/snapshot.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstorage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/errors" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +// PrepareLogStoreSnapshotSST prepares the SST that should be ingested to apply +// the snapshot with the given metadata. +// +// The full snapshot contains additional SSTs reflecting the state machine which +// is not reflected here. It must be ingested first. +// +// If a crash occurs between ingesting the statemachine SST(s) and the log store SSTs, +// start-up reconciliation will re-ingest. +func PrepareLogStoreSnapshotSST( + ctx context.Context, + id storage.FullReplicaID, + hs raftpb.HardState, + snap raftpb.SnapshotMetadata, + logStoreSSTWriter *storage.SSTWriter, +) error { + if raft.IsEmptyHardState(hs) { + // Raft will never provide an empty HardState if it is providing a + // nonempty snapshot because we discard snapshots that do not increase + // the commit index. + return errors.AssertionFailedf("found empty HardState for non-empty Snapshot %+v", snap) + } + + // Clearing the unreplicated state. + // + // NB: We do not expect to see range keys in the unreplicated state, so + // we don't drop a range tombstone across the range key space. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID) + unreplicatedStart := unreplicatedPrefixKey + unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + if err := logStoreSSTWriter.ClearRawRange( + unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + return errors.Wrapf(err, "error clearing range of unreplicated SST writer") + } + + sl := logstore.NewStateLoader(id.RangeID) + // Update HardState. + if err := sl.SetHardState(ctx, logStoreSSTWriter, hs); err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // We've cleared all the raft state above, so we are forced to write the + // RaftReplicaID again here. + if err := sl.SetRaftReplicaID( + ctx, logStoreSSTWriter, id.ReplicaID); err != nil { + return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } + if err := sl.SetRaftTruncatedState( + ctx, logStoreSSTWriter, + &roachpb.RaftTruncatedState{ + Index: snap.Index, + Term: snap.Term, + }, + ); err != nil { + return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") + } + + return nil +} diff --git a/pkg/kv/kvserver/logstore/stateloader.go b/pkg/kv/kvserver/logstore/stateloader.go index 3d05daaebae7..dadb16849c5b 100644 --- a/pkg/kv/kvserver/logstore/stateloader.go +++ b/pkg/kv/kvserver/logstore/stateloader.go @@ -39,6 +39,8 @@ import ( // TODO(pavelkalinnikov): understand the split between logstore and raftlog // packages, reshuffle or merge them, including this StateLoader. type StateLoader struct { + // TODO(tbg): this shouldn't be embedded here, because now we have all of the + // StateMachine keygen methods exported on StateLoader. keys.RangeIDPrefixBuf } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..5beaa0715d2d 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "math/big" "sort" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" @@ -93,6 +94,7 @@ func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool { } func (r *Replica) canUnquiesceRLocked() bool { + big.Int{} return r.mu.quiescent && // If the replica is uninitialized (i.e. it contains no replicated state), // it is not allowed to unquiesce and begin Tick()'ing itself. diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 00d53520413f..eae7eae76da8 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary" @@ -537,13 +538,6 @@ func (r *Replica) applySnapshot( } }() - if raft.IsEmptyHardState(hs) { - // Raft will never provide an empty HardState if it is providing a - // nonempty snapshot because we discard snapshots that do not increase - // the commit index. - log.Fatalf(ctx, "found empty HardState for non-empty Snapshot %+v", nonemptySnap) - } - var stats struct { // Time to process subsumed replicas. subsumedReplicas time.Time @@ -564,60 +558,42 @@ func (r *Replica) applySnapshot( log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) }(timeutil.Now()) - unreplicatedSSTFile := &storage.MemFile{} - unreplicatedSST := storage.MakeIngestionSSTWriter( - ctx, r.ClusterSettings(), unreplicatedSSTFile, + logstoreSSTMemFile := &storage.MemFile{} + logStoreSSTWriter := storage.MakeIngestionSSTWriter( + ctx, r.ClusterSettings(), logstoreSSTMemFile, ) - defer unreplicatedSST.Close() - - // Clearing the unreplicated state. - // - // NB: We do not expect to see range keys in the unreplicated state, so - // we don't drop a range tombstone across the range key space. - unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) - unreplicatedStart := unreplicatedPrefixKey - unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() - if err = unreplicatedSST.ClearRawRange( - unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ + defer logStoreSSTWriter.Close() + + // Create SST to clear the RangeID-local unreplicated span. Will + // be combined with the SST that contains the replicated data, and + // in sum that is the entire Replica state we're going to write. + if err := kvstorage.PrepareLogStoreSnapshotSST(ctx, + storage.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID}, + hs, + nonemptySnap.Metadata, + &logStoreSSTWriter, ); err != nil { - return errors.Wrapf(err, "error clearing range of unreplicated SST writer") - } - - // Update HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { - return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") - } - // We've cleared all the raft state above, so we are forced to write the - // RaftReplicaID again here. - if err := r.raftMu.stateLoader.SetRaftReplicaID( - ctx, &unreplicatedSST, r.replicaID); err != nil { - return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + return err } // Update Raft entries. r.store.raftEntryCache.Drop(r.RangeID) - if err := r.raftMu.stateLoader.SetRaftTruncatedState( - ctx, &unreplicatedSST, - &roachpb.RaftTruncatedState{ - Index: nonemptySnap.Metadata.Index, - Term: nonemptySnap.Metadata.Term, - }, - ); err != nil { - return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") - } - - if err := unreplicatedSST.Finish(); err != nil { + if err := logStoreSSTWriter.Finish(); err != nil { return err } - if unreplicatedSST.DataSize > 0 { - // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than + if logStoreSSTWriter.DataSize > 0 { + // TODO(itsbilal): Write to SST directly in logStoreSSTWriter rather than // buffering in a MemFile first. - if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { + if err := inSnap.SSTStorageScratch.WriteSST(ctx, logstoreSSTMemFile.Data()); err != nil { return err } } + // Next, we might be subsuming replicas. This occurs if the snapshot reflects + // at least one merge that isn't reflected in the pre-snapshot state. For each + // such replica, we'll + // If we're subsuming a replica below, we don't have its last NextReplicaID, // nor can we obtain it. That's OK: we can just be conservative and use the // maximum possible replica ID. preDestroyRaftMuLocked will write a replica @@ -790,11 +766,22 @@ func (r *Replica) clearSubsumedReplicaDiskData( subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, ) error { + + // TODO(tbg): I think this method can be simplfied quite a bit. First, + // better not to interleave snap creation with in-mem destruction, i.e. + // make preDestroyRaftMuLocked have separate parts for staging the deletion + // and marking a *Replica is being destroyed. + // Second, the "subsumed range extends past snapshot" part can be simplified. + // We can use a SpanGroup - init it with the snapshot desc RSpan, then subtract out all + // of the subsumed desc RSpans. For slices that remain, we need to clear the replicated + // key spans. + // NB: we don't clear RangeID local key spans here. That happens // via the call to preDestroyRaftMuLocked. getKeySpans := rditer.MakeReplicatedKeySpansExceptRangeID - keySpans := getKeySpans(desc) - totalKeySpans := append([]roachpb.Span(nil), keySpans...) + + keySpans := getKeySpans(desc) // key-based replicated spans for snap + totalKeySpans := append([]roachpb.Span(nil), keySpans...) // copy initially but will be adjusted for _, sr := range subsumedRepls { // We mark the replica as destroyed so that new commands are not // accepted. This destroy status will be detected after the batch diff --git a/pkg/storage/replicas_storage.go b/pkg/storage/replicas_storage.go index 17953e84cafc..12f767515b9e 100644 --- a/pkg/storage/replicas_storage.go +++ b/pkg/storage/replicas_storage.go @@ -533,6 +533,8 @@ const ( ) // FullReplicaID is a fully-qualified replica ID. +// +// TODO(sep-raft-log): move this to kvstorage. type FullReplicaID struct { // RangeID is the id of the range. RangeID roachpb.RangeID