Skip to content

Commit

Permalink
kvserver: partial implementation of ReplicasStorage
Browse files Browse the repository at this point in the history
The significant missing piece is part of the Init
implementation to handle a RecoveryInconsistentReplica that
requires applying committed raft log entries to the state
machine. This missing piece will need to wait until cockroachdb#75729
is fixed.

There are multiple TODOs, including related to concurrency,
but the implementation is complete enough for the datadriven
test to exercise many state transitions. Additionally, the
test exercises loss of unsynced state, and fixup of that
state in ReplicasStorage.Init, by using vfs.NewStrictMem.

Informs cockroachdb#16624

Release note: None
  • Loading branch information
sumeerbhola authored and tbg committed Nov 4, 2022
1 parent 01cfd27 commit e6223a2
Show file tree
Hide file tree
Showing 15 changed files with 3,896 additions and 869 deletions.
4 changes: 2 additions & 2 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ var (

// 2. Unreplicated Range-ID keys
//
// localRangeIDUnreplicatedInfix is the post-Range ID specifier for all
// LocalRangeIDUnreplicatedInfix is the post-Range ID specifier for all
// per-range data that is not fully Raft replicated. By appending this
// after the Range ID, these keys will be sorted directly after the local
// replicated keys for the same Range ID, so they can be manipulated either
// together or individually in a single scan.
localRangeIDUnreplicatedInfix = []byte("u")
LocalRangeIDUnreplicatedInfix = []byte("u")
// LocalRangeTombstoneSuffix is the suffix for the range tombstone.
//
// NB: This suffix was originally named LocalRaftTombstoneSuffix, which is
Expand Down
4 changes: 2 additions & 2 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func RangeVersionKey(rangeID roachpb.RangeID) roachpb.Key {
// MakeRangeIDUnreplicatedPrefix creates a range-local key prefix from
// rangeID for all unreplicated data.
func MakeRangeIDUnreplicatedPrefix(rangeID roachpb.RangeID) roachpb.Key {
return makePrefixWithRangeID(LocalRangeIDPrefix, rangeID, localRangeIDUnreplicatedInfix)
return makePrefixWithRangeID(LocalRangeIDPrefix, rangeID, LocalRangeIDUnreplicatedInfix)
}

// makeRangeIDUnreplicatedKey creates a range-local unreplicated key based
Expand Down Expand Up @@ -994,7 +994,7 @@ func (b RangeIDPrefixBuf) replicatedPrefix() roachpb.Key {
}

func (b RangeIDPrefixBuf) unreplicatedPrefix() roachpb.Key {
return append(roachpb.Key(b), localRangeIDUnreplicatedInfix...)
return append(roachpb.Key(b), LocalRangeIDUnreplicatedInfix...)
}

// AbortSpanKey returns a range-local key by Range ID for an AbortSpan
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func localRangeIDKeyParse(input string) (remainder string, key roachpb.Key) {
infix, input = mustShift(input)
var replicated bool
switch {
case bytes.Equal(localRangeIDUnreplicatedInfix, []byte(infix)):
case bytes.Equal(LocalRangeIDUnreplicatedInfix, []byte(infix)):
case bytes.Equal(LocalRangeIDReplicatedInfix, []byte(infix)):
replicated = true
default:
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"replica_sst_snapshot_storage.go",
"replica_tscache.go",
"replica_write.go",
"replicas_storage.go",
"replicate_queue.go",
"scanner.go",
"scheduler.go",
Expand Down Expand Up @@ -294,6 +295,7 @@ go_test(
"replica_sst_snapshot_storage_test.go",
"replica_test.go",
"replica_tscache_test.go",
"replicas_storage_test.go",
"replicate_queue_test.go",
"replicate_test.go",
"reset_quorum_test.go",
Expand Down Expand Up @@ -436,6 +438,7 @@ go_test(
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
Expand Down
24 changes: 18 additions & 6 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,40 @@ func (s *SSTSnapshotStorageScratch) NewFile(
// the provided SST when it is finished using it. If the provided SST is empty,
// then no file will be created and nothing will be written.
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
_, err := s.WriteSSTAndReturnPath(ctx, data)
return err
}

// WriteSSTAndReturnPath is similar to WriteSST, but returns the path of the
// written file.
func (s *SSTSnapshotStorageScratch) WriteSSTAndReturnPath(
ctx context.Context, data []byte,
) (path string, err error) {
if s.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
return "", errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
if len(data) == 0 {
return nil
return "", nil
}
f, err := s.NewFile(ctx, 512<<10 /* 512 KB */)
if err != nil {
return err
return "", err
}
defer func() {
// Closing an SSTSnapshotStorageFile multiple times is idempotent. Nothing
// actionable if closing fails.
_ = f.Close()
}()
if _, err := f.Write(data); err != nil {
return err
return "", err
}
if err := f.Sync(); err != nil {
return err
return "", err
}
if err := f.Close(); err != nil {
return "", err
}
return f.Close()
return s.ssts[len(s.ssts)-1], nil
}

// SSTs returns the names of the files created.
Expand Down
2,540 changes: 2,540 additions & 0 deletions pkg/kv/kvserver/replicas_storage.go

Large diffs are not rendered by default.

Loading

0 comments on commit e6223a2

Please sign in to comment.