Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: move a logstorage portion of applySnapshot to kvstorage #93904

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"doc.go",
"init.go",
"snapshot.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage",
visibility = ["//visibility:public"],
Expand All @@ -20,6 +21,8 @@ go_library(
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_raft_v3//:raft",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/kvstorage/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstorage

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

// PrepareLogStoreSnapshotSST prepares the SST that should be ingested to apply
// the snapshot with the given metadata.
//
// The full snapshot contains additional SSTs reflecting the state machine which
// is not reflected here. It must be ingested first.
//
// If a crash occurs between ingesting the statemachine SST(s) and the log store SSTs,
// start-up reconciliation will re-ingest.
func PrepareLogStoreSnapshotSST(
ctx context.Context,
id storage.FullReplicaID,
hs raftpb.HardState,
snap raftpb.SnapshotMetadata,
logStoreSSTWriter *storage.SSTWriter,
) error {
if raft.IsEmptyHardState(hs) {
// Raft will never provide an empty HardState if it is providing a
// nonempty snapshot because we discard snapshots that do not increase
// the commit index.
return errors.AssertionFailedf("found empty HardState for non-empty Snapshot %+v", snap)
}

// Clearing the unreplicated state.
//
// NB: We do not expect to see range keys in the unreplicated state, so
// we don't drop a range tombstone across the range key space.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err := logStoreSSTWriter.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

sl := logstore.NewStateLoader(id.RangeID)
// Update HardState.
if err := sl.SetHardState(ctx, logStoreSSTWriter, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, logStoreSSTWriter, id.ReplicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}
if err := sl.SetRaftTruncatedState(
ctx, logStoreSSTWriter,
&roachpb.RaftTruncatedState{
Index: snap.Index,
Term: snap.Term,
},
); err != nil {
return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

return nil
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
// TODO(pavelkalinnikov): understand the split between logstore and raftlog
// packages, reshuffle or merge them, including this StateLoader.
type StateLoader struct {
// TODO(tbg): this shouldn't be embedded here, because now we have all of the
// StateMachine keygen methods exported on StateLoader.
keys.RangeIDPrefixBuf
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"math/big"
"sort"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
Expand Down Expand Up @@ -93,6 +94,7 @@ func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool {
}

func (r *Replica) canUnquiesceRLocked() bool {
big.Int{}
return r.mu.quiescent &&
// If the replica is uninitialized (i.e. it contains no replicated state),
// it is not allowed to unquiesce and begin Tick()'ing itself.
Expand Down
85 changes: 36 additions & 49 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
Expand Down Expand Up @@ -537,13 +538,6 @@ func (r *Replica) applySnapshot(
}
}()

if raft.IsEmptyHardState(hs) {
// Raft will never provide an empty HardState if it is providing a
// nonempty snapshot because we discard snapshots that do not increase
// the commit index.
log.Fatalf(ctx, "found empty HardState for non-empty Snapshot %+v", nonemptySnap)
}

var stats struct {
// Time to process subsumed replicas.
subsumedReplicas time.Time
Expand All @@ -564,60 +558,42 @@ func (r *Replica) applySnapshot(
log.Infof(ctx, "applied %s (%s)", inSnap, logDetails)
}(timeutil.Now())

unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, r.ClusterSettings(), unreplicatedSSTFile,
logstoreSSTMemFile := &storage.MemFile{}
logStoreSSTWriter := storage.MakeIngestionSSTWriter(
ctx, r.ClusterSettings(), logstoreSSTMemFile,
)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
//
// NB: We do not expect to see range keys in the unreplicated state, so
// we don't drop a range tombstone across the range key space.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
defer logStoreSSTWriter.Close()

// Create SST to clear the RangeID-local unreplicated span. Will
// be combined with the SST that contains the replicated data, and
// in sum that is the entire Replica state we're going to write.
if err := kvstorage.PrepareLogStoreSnapshotSST(ctx,
storage.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID},
hs,
nonemptySnap.Metadata,
&logStoreSSTWriter,
); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
return err
}

// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)

if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: nonemptySnap.Metadata.Index,
Term: nonemptySnap.Metadata.Term,
},
); err != nil {
return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
if err := logStoreSSTWriter.Finish(); err != nil {
return err
}
if unreplicatedSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
if logStoreSSTWriter.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in logStoreSSTWriter rather than
// buffering in a MemFile first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
if err := inSnap.SSTStorageScratch.WriteSST(ctx, logstoreSSTMemFile.Data()); err != nil {
return err
}
}

// Next, we might be subsuming replicas. This occurs if the snapshot reflects
// at least one merge that isn't reflected in the pre-snapshot state. For each
// such replica, we'll

// If we're subsuming a replica below, we don't have its last NextReplicaID,
// nor can we obtain it. That's OK: we can just be conservative and use the
// maximum possible replica ID. preDestroyRaftMuLocked will write a replica
Expand Down Expand Up @@ -790,11 +766,22 @@ func (r *Replica) clearSubsumedReplicaDiskData(
subsumedRepls []*Replica,
subsumedNextReplicaID roachpb.ReplicaID,
) error {

// TODO(tbg): I think this method can be simplfied quite a bit. First,
// better not to interleave snap creation with in-mem destruction, i.e.
// make preDestroyRaftMuLocked have separate parts for staging the deletion
// and marking a *Replica is being destroyed.
// Second, the "subsumed range extends past snapshot" part can be simplified.
// We can use a SpanGroup - init it with the snapshot desc RSpan, then subtract out all
// of the subsumed desc RSpans. For slices that remain, we need to clear the replicated
// key spans.

// NB: we don't clear RangeID local key spans here. That happens
// via the call to preDestroyRaftMuLocked.
getKeySpans := rditer.MakeReplicatedKeySpansExceptRangeID
keySpans := getKeySpans(desc)
totalKeySpans := append([]roachpb.Span(nil), keySpans...)

keySpans := getKeySpans(desc) // key-based replicated spans for snap
totalKeySpans := append([]roachpb.Span(nil), keySpans...) // copy initially but will be adjusted
for _, sr := range subsumedRepls {
// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/replicas_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ const (
)

// FullReplicaID is a fully-qualified replica ID.
//
// TODO(sep-raft-log): move this to kvstorage.
type FullReplicaID struct {
// RangeID is the id of the range.
RangeID roachpb.RangeID
Expand Down