From adfa97f100ab02fec4a9211bd3a9acdd65755f56 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 27 Jun 2023 14:25:25 +0000 Subject: [PATCH] kvserver: stub RemoteFile support to AddSSTable Release note: none. Epic: none. --- pkg/kv/batch.go | 2 + pkg/kv/db.go | 26 ++++++++++++- pkg/kv/kvpb/api.proto | 39 ++++++++++++++++++++ pkg/kv/kvserver/BUILD.bazel | 2 + pkg/kv/kvserver/app_batch.go | 2 + pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 24 ++++++++++++ pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 3 ++ pkg/kv/kvserver/raftlog/payload.go | 4 -- pkg/kv/kvserver/replica_app_batch.go | 1 + pkg/kv/kvserver/replica_proposal.go | 26 +++++++++++++ 10 files changed, 123 insertions(+), 6 deletions(-) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 183bc969a411..cc4171d03029 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -973,6 +973,7 @@ func (b *Batch) adminRelocateRange( func (b *Batch) addSSTable( s, e interface{}, data []byte, + remoteFile kvpb.AddSSTableRequest_RemoteFile, disallowConflicts bool, disallowShadowing bool, disallowShadowingBelow hlc.Timestamp, @@ -996,6 +997,7 @@ func (b *Batch) addSSTable( EndKey: end, }, Data: data, + RemoteFile: remoteFile, DisallowConflicts: disallowConflicts, DisallowShadowing: disallowShadowing, DisallowShadowingBelow: disallowShadowingBelow, diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 320a38f17b54..31543e46826f 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -712,6 +712,8 @@ func (db *DB) AdminRelocateRange( return getOneErr(db.Run(ctx, b), b) } +var noRemoteFile kvpb.AddSSTableRequest_RemoteFile + // AddSSTable links a file into the Pebble log-structured merge-tree. // // The disallowConflicts, disallowShadowingBelow parameters @@ -728,7 +730,7 @@ func (db *DB) AddSSTable( batchTs hlc.Timestamp, ) (roachpb.Span, int64, error) { b := &Batch{Header: kvpb.Header{Timestamp: batchTs}} - b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, + b.addSSTable(begin, end, data, noRemoteFile, disallowConflicts, disallowShadowing, disallowShadowingBelow, stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */) err := getOneErr(db.Run(ctx, b), b) if err != nil { @@ -741,6 +743,25 @@ func (db *DB) AddSSTable( return resp.RangeSpan, resp.AvailableBytes, nil } +func (db *DB) AddRemoteSSTable( + ctx context.Context, + span roachpb.Span, + file kvpb.AddSSTableRequest_RemoteFile, + stats *enginepb.MVCCStats, +) (roachpb.Span, int64, error) { + b := &Batch{} + b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, hlc.Timestamp{}) + err := getOneErr(db.Run(ctx, b), b) + if err != nil { + return roachpb.Span{}, 0, err + } + if l := len(b.response.Responses); l != 1 { + return roachpb.Span{}, 0, errors.AssertionFailedf("expected single response, got %d", l) + } + resp := b.response.Responses[0].GetAddSstable() + return resp.RangeSpan, resp.AvailableBytes, nil +} + // AddSSTableAtBatchTimestamp links a file into the Pebble log-structured // merge-tree. All keys in the SST must have batchTs as their timestamp, but the // batch timestamp at which the sst is actually ingested -- and that those keys @@ -759,7 +780,8 @@ func (db *DB) AddSSTableAtBatchTimestamp( batchTs hlc.Timestamp, ) (hlc.Timestamp, roachpb.Span, int64, error) { b := &Batch{Header: kvpb.Header{Timestamp: batchTs}} - b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, + b.addSSTable(begin, end, data, noRemoteFile, + disallowConflicts, disallowShadowing, disallowShadowingBelow, stats, ingestAsWrites, batchTs) err := getOneErr(db.Run(ctx, b), b) if err != nil { diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 43ff73f6164c..debfb8d5a0b3 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1996,6 +1996,45 @@ message AddSSTableRequest { // also find and return the key at which the span after the added file span // is likely non-empty. See AddSSTableResponse.FollowingLikelyNonEmptySpanStart. bool return_following_likely_non_empty_span_start = 9; + + // RemoteFile indicates that the span indicated by the request start and end + // key of the file specified should be added. Data cannot be set if RemoteFile + // is non-empty, and many other request parameters such as collision/shadow + // checking, write-at-request-timestamp, or ingest-as-writes are unsupported. + // + // TODO(dt, msbutler, bilal): This is unsupported. + // TOOD(dt, msbutler, bilal): support sst_timestamp_to_request_timestamp. + message RemoteFile { + string location = 1; + string path = 2; + } + RemoteFile remote_file = 10 [(gogoproto.nullable) = false]; + + // AddSSTableRequest_PrefixReplacement is used to represent a prefix to be + // replaced and the prefix with which to replace it. + message PrefixReplacement { + bytes from = 1; + bytes to = 2; + } + + // PrefixReplaecment is used to that keys in the sst with the prefix specified + // in "from" should instead appear to have that prefix replaced by "to", when + // during iteration, seeking, or other reads. For example, an SST containing + // keys a1, a2, and a3 when added with PrefixReplaecment={From: a, To:x} would + // produce the same results for subsequent operations as adding an SST with + // the keys x1, x2, x3 instead. The implementaiton may however elect to defer + // these replacements until the file is read. + // + // TODO(dt,msbutler,bilal): This is unsupported. + PrefixReplacement prefix_replacement = 11 [(gogoproto.nullable) = false]; + + // IgnoreKeysAboveTimestamp is used when ingesting an SSTable that contains + // keys, including mvcc revisions of keys, captured over a time range, to + // indicate that readers of that sstable should read it "as of" a the fixed + // time, ignoring any keys with later timestamps. + // + // TODO(dt,msbutler,bilal): This is unsupported. + util.hlc.Timestamp ignore_keys_above_timestamp = 12 [(gogoproto.nullable) = false]; } // AddSSTableResponse is the response to a AddSSTable() operation. diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 4bf60a9bfc14..4f2f2ec0c460 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -176,6 +176,7 @@ go_library( "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", + "//pkg/security/username", "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", @@ -199,6 +200,7 @@ go_library( "//pkg/util/grunning", "//pkg/util/hlc", "//pkg/util/humanizeutil", + "//pkg/util/ioctx", "//pkg/util/iterutil", "//pkg/util/limit", "//pkg/util/log", diff --git a/pkg/kv/kvserver/app_batch.go b/pkg/kv/kvserver/app_batch.go index 55e4b7865260..67474521f403 100644 --- a/pkg/kv/kvserver/app_batch.go +++ b/pkg/kv/kvserver/app_batch.go @@ -13,6 +13,7 @@ package kvserver import ( "context" + "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -154,6 +155,7 @@ type postAddEnv struct { eng storage.Engine sideloaded logstore.SideloadStorage bulkLimiter *rate.Limiter + external *cloud.ExternalStorageAccessor } func (b *appBatch) runPostAddTriggers( diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 0da4f97b5faf..dee967022f7c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -157,6 +157,30 @@ func EvalAddSSTable( } } + if args.RemoteFile.Path != "" { + log.Infof(ctx, "AddSSTable of remote file: %s in %s", args.RemoteFile.Path, args.RemoteFile.Location) + stats := *args.MVCCStats + stats.ContainsEstimates++ + + ms.Add(stats) + + mvccHistoryMutation := &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ + Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}}, + } + return result.Result{ + Replicated: kvserverpb.ReplicatedEvalResult{ + AddSSTable: &kvserverpb.ReplicatedEvalResult_AddSSTable{ + RemoteFileLoc: args.RemoteFile.Location, + RemoteFilePath: args.RemoteFile.Path, + Span: roachpb.Span{Key: start.Key, EndKey: end.Key}, + }, + MVCCHistoryMutation: mvccHistoryMutation, + }, + }, nil + } + + log.Infof(ctx, "non-remote AddSSTable") + // Reject AddSSTable requests not writing at the request timestamp if requested. if AddSSTableRequireAtRequestTimestamp.Get(&cArgs.EvalCtx.ClusterSettings().SV) && sstToReqTS.IsEmpty() { diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 81ca6658cf98..e4b3f1beac95 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -193,6 +193,9 @@ message ReplicatedEvalResult { // taken place since 22.1, to make sure all Raft log entries from 21.2 or // older have been applied on all replicas. bool at_write_timestamp = 4; + + string remote_file_loc = 5; + string remote_file_path = 6; } AddSSTable add_sstable = 17 [(gogoproto.customname) = "AddSSTable"]; diff --git a/pkg/kv/kvserver/raftlog/payload.go b/pkg/kv/kvserver/raftlog/payload.go index 781282b2386f..2635e0b679d9 100644 --- a/pkg/kv/kvserver/raftlog/payload.go +++ b/pkg/kv/kvserver/raftlog/payload.go @@ -50,10 +50,6 @@ func EncodeCommand( if raftAdmissionMeta != nil { entryEncoding = EntryEncodingSideloadedWithAC } - - if command.ReplicatedEvalResult.AddSSTable.Data == nil { - return nil, errors.Errorf("cannot sideload empty SSTable") - } } // NB: If (significantly) re-working how raft commands are encoded, make the diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 791783c16ae3..e2cda9a8a335 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -149,6 +149,7 @@ func (b *replicaAppBatch) Stage( eng: b.r.store.TODOEngine(), sideloaded: b.r.raftMu.sideloaded, bulkLimiter: b.r.store.limiters.BulkIOWriteRate, + external: b.r.store.cfg.ExternalStorage, }); err != nil { return nil, err } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 8d1a389124a8..e6c5cab4af60 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -16,6 +16,7 @@ import ( "path/filepath" "time" + "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -26,12 +27,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -634,6 +637,29 @@ func addSSTablePreApply( index kvpb.RaftIndex, sst kvserverpb.ReplicatedEvalResult_AddSSTable, ) bool { + if sst.RemoteFilePath != "" { + // TODO(dt, bilal, msbutler): Replace this with eng.IngestRemoteFile() + log.Warningf(ctx, "EXPERIMENTAL AddSSTABLE REMOTE FILE UNSUPPORTED; downloading %s from %s and adding it whole, ignoring span %s", sst.RemoteFilePath, sst.RemoteFileLoc, sst.Span) + s, err := env.external.OpenURL(ctx, sst.RemoteFileLoc, username.SQLUsername{}) + if err != nil { + log.Fatalf(ctx, "failed to open remote location %q below raft: %v", sst.RemoteFileLoc, err) + } + r, _, err := s.ReadFile(ctx, sst.RemoteFilePath, cloud.ReadOptions{}) + if err != nil { + log.Fatalf(ctx, "failed to open remote file path %q in %q below raft: %v", sst.RemoteFilePath, sst.RemoteFileLoc, err) + } + content, err := ioctx.ReadAll(ctx, r) + r.Close(ctx) + if err != nil { + log.Fatalf(ctx, "failed to read remote file %q in %q below raft: %v", sst.RemoteFilePath, sst.RemoteFileLoc, err) + } + sst.Data = content + sst.CRC32 = util.CRC32(content) + log.Infof(ctx, "Unsupported RemoteFile AddSSTABLE downloaded %q, read %d bytes", sst.RemoteFileLoc, len(content)) + if err := env.sideloaded.Put(ctx, index, term, content); err != nil { + log.Fatalf(ctx, "failed to write downloaded remote file %q in %q below raft: %v", sst.RemoteFilePath, sst.RemoteFileLoc, err) + } + } checksum := util.CRC32(sst.Data) if checksum != sst.CRC32 {