Skip to content

Commit

Permalink
kvserver: stub RemoteFile support to AddSSTable
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Jul 17, 2023
1 parent 763210e commit adfa97f
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func (b *Batch) adminRelocateRange(
func (b *Batch) addSSTable(
s, e interface{},
data []byte,
remoteFile kvpb.AddSSTableRequest_RemoteFile,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
Expand All @@ -996,6 +997,7 @@ func (b *Batch) addSSTable(
EndKey: end,
},
Data: data,
RemoteFile: remoteFile,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
DisallowShadowingBelow: disallowShadowingBelow,
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ func (db *DB) AdminRelocateRange(
return getOneErr(db.Run(ctx, b), b)
}

var noRemoteFile kvpb.AddSSTableRequest_RemoteFile

// AddSSTable links a file into the Pebble log-structured merge-tree.
//
// The disallowConflicts, disallowShadowingBelow parameters
Expand All @@ -728,7 +730,7 @@ func (db *DB) AddSSTable(
batchTs hlc.Timestamp,
) (roachpb.Span, int64, error) {
b := &Batch{Header: kvpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
b.addSSTable(begin, end, data, noRemoteFile, disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
Expand All @@ -741,6 +743,25 @@ func (db *DB) AddSSTable(
return resp.RangeSpan, resp.AvailableBytes, nil
}

func (db *DB) AddRemoteSSTable(
ctx context.Context,
span roachpb.Span,
file kvpb.AddSSTableRequest_RemoteFile,
stats *enginepb.MVCCStats,
) (roachpb.Span, int64, error) {
b := &Batch{}
b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, hlc.Timestamp{})
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
return roachpb.Span{}, 0, err
}
if l := len(b.response.Responses); l != 1 {
return roachpb.Span{}, 0, errors.AssertionFailedf("expected single response, got %d", l)
}
resp := b.response.Responses[0].GetAddSstable()
return resp.RangeSpan, resp.AvailableBytes, nil
}

// AddSSTableAtBatchTimestamp links a file into the Pebble log-structured
// merge-tree. All keys in the SST must have batchTs as their timestamp, but the
// batch timestamp at which the sst is actually ingested -- and that those keys
Expand All @@ -759,7 +780,8 @@ func (db *DB) AddSSTableAtBatchTimestamp(
batchTs hlc.Timestamp,
) (hlc.Timestamp, roachpb.Span, int64, error) {
b := &Batch{Header: kvpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
b.addSSTable(begin, end, data, noRemoteFile,
disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, batchTs)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
Expand Down
39 changes: 39 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,45 @@ message AddSSTableRequest {
// also find and return the key at which the span after the added file span
// is likely non-empty. See AddSSTableResponse.FollowingLikelyNonEmptySpanStart.
bool return_following_likely_non_empty_span_start = 9;

// RemoteFile indicates that the span indicated by the request start and end
// key of the file specified should be added. Data cannot be set if RemoteFile
// is non-empty, and many other request parameters such as collision/shadow
// checking, write-at-request-timestamp, or ingest-as-writes are unsupported.
//
// TODO(dt, msbutler, bilal): This is unsupported.
// TOOD(dt, msbutler, bilal): support sst_timestamp_to_request_timestamp.
message RemoteFile {
string location = 1;
string path = 2;
}
RemoteFile remote_file = 10 [(gogoproto.nullable) = false];

// AddSSTableRequest_PrefixReplacement is used to represent a prefix to be
// replaced and the prefix with which to replace it.
message PrefixReplacement {
bytes from = 1;
bytes to = 2;
}

// PrefixReplaecment is used to that keys in the sst with the prefix specified
// in "from" should instead appear to have that prefix replaced by "to", when
// during iteration, seeking, or other reads. For example, an SST containing
// keys a1, a2, and a3 when added with PrefixReplaecment={From: a, To:x} would
// produce the same results for subsequent operations as adding an SST with
// the keys x1, x2, x3 instead. The implementaiton may however elect to defer
// these replacements until the file is read.
//
// TODO(dt,msbutler,bilal): This is unsupported.
PrefixReplacement prefix_replacement = 11 [(gogoproto.nullable) = false];

// IgnoreKeysAboveTimestamp is used when ingesting an SSTable that contains
// keys, including mvcc revisions of keys, captured over a time range, to
// indicate that readers of that sstable should read it "as of" a the fixed
// time, ignoring any keys with later timestamps.
//
// TODO(dt,msbutler,bilal): This is unsupported.
util.hlc.Timestamp ignore_keys_above_timestamp = 12 [(gogoproto.nullable) = false];
}

// AddSSTableResponse is the response to a AddSSTable() operation.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ go_library(
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/security/username",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
Expand All @@ -199,6 +200,7 @@ go_library(
"//pkg/util/grunning",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/ioctx",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -154,6 +155,7 @@ type postAddEnv struct {
eng storage.Engine
sideloaded logstore.SideloadStorage
bulkLimiter *rate.Limiter
external *cloud.ExternalStorageAccessor
}

func (b *appBatch) runPostAddTriggers(
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,30 @@ func EvalAddSSTable(
}
}

if args.RemoteFile.Path != "" {
log.Infof(ctx, "AddSSTable of remote file: %s in %s", args.RemoteFile.Path, args.RemoteFile.Location)
stats := *args.MVCCStats
stats.ContainsEstimates++

ms.Add(stats)

mvccHistoryMutation := &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{
Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}},
}
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
AddSSTable: &kvserverpb.ReplicatedEvalResult_AddSSTable{
RemoteFileLoc: args.RemoteFile.Location,
RemoteFilePath: args.RemoteFile.Path,
Span: roachpb.Span{Key: start.Key, EndKey: end.Key},
},
MVCCHistoryMutation: mvccHistoryMutation,
},
}, nil
}

log.Infof(ctx, "non-remote AddSSTable")

// Reject AddSSTable requests not writing at the request timestamp if requested.
if AddSSTableRequireAtRequestTimestamp.Get(&cArgs.EvalCtx.ClusterSettings().SV) &&
sstToReqTS.IsEmpty() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ message ReplicatedEvalResult {
// taken place since 22.1, to make sure all Raft log entries from 21.2 or
// older have been applied on all replicas.
bool at_write_timestamp = 4;

string remote_file_loc = 5;
string remote_file_path = 6;
}
AddSSTable add_sstable = 17 [(gogoproto.customname) = "AddSSTable"];

Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/raftlog/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ func EncodeCommand(
if raftAdmissionMeta != nil {
entryEncoding = EntryEncodingSideloadedWithAC
}

if command.ReplicatedEvalResult.AddSSTable.Data == nil {
return nil, errors.Errorf("cannot sideload empty SSTable")
}
}

// NB: If (significantly) re-working how raft commands are encoded, make the
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (b *replicaAppBatch) Stage(
eng: b.r.store.TODOEngine(),
sideloaded: b.r.raftMu.sideloaded,
bulkLimiter: b.r.store.limiters.BulkIOWriteRate,
external: b.r.store.cfg.ExternalStorage,
}); err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"path/filepath"
"time"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand All @@ -26,12 +27,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -634,6 +637,29 @@ func addSSTablePreApply(
index kvpb.RaftIndex,
sst kvserverpb.ReplicatedEvalResult_AddSSTable,
) bool {
if sst.RemoteFilePath != "" {
// TODO(dt, bilal, msbutler): Replace this with eng.IngestRemoteFile()
log.Warningf(ctx, "EXPERIMENTAL AddSSTABLE REMOTE FILE UNSUPPORTED; downloading %s from %s and adding it whole, ignoring span %s", sst.RemoteFilePath, sst.RemoteFileLoc, sst.Span)
s, err := env.external.OpenURL(ctx, sst.RemoteFileLoc, username.SQLUsername{})
if err != nil {
log.Fatalf(ctx, "failed to open remote location %q below raft: %v", sst.RemoteFileLoc, err)
}
r, _, err := s.ReadFile(ctx, sst.RemoteFilePath, cloud.ReadOptions{})
if err != nil {
log.Fatalf(ctx, "failed to open remote file path %q in %q below raft: %v", sst.RemoteFilePath, sst.RemoteFileLoc, err)
}
content, err := ioctx.ReadAll(ctx, r)
r.Close(ctx)
if err != nil {
log.Fatalf(ctx, "failed to read remote file %q in %q below raft: %v", sst.RemoteFilePath, sst.RemoteFileLoc, err)
}
sst.Data = content
sst.CRC32 = util.CRC32(content)
log.Infof(ctx, "Unsupported RemoteFile AddSSTABLE downloaded %q, read %d bytes", sst.RemoteFileLoc, len(content))
if err := env.sideloaded.Put(ctx, index, term, content); err != nil {
log.Fatalf(ctx, "failed to write downloaded remote file %q in %q below raft: %v", sst.RemoteFilePath, sst.RemoteFileLoc, err)
}
}
checksum := util.CRC32(sst.Data)

if checksum != sst.CRC32 {
Expand Down

0 comments on commit adfa97f

Please sign in to comment.