From 51feb0fa2b366a158c21eb6745af7319c71d48bc Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 27 Apr 2024 14:37:11 +0000 Subject: [PATCH] streamingccl/streamingest: consume all kvs in event This switching from passing single KV events from the subscription to the ingest processor to passing batches of KVs. This avoids needing to go back to the channel and select for each KV, but rather for each batch of KVs. Release note: none. Epic: none. --- pkg/ccl/cmdccl/clusterrepl/main.go | 6 ++- pkg/ccl/streamingccl/event.go | 32 +++++------ .../replication_helpers.go | 14 +++-- pkg/ccl/streamingccl/streamclient/BUILD.bazel | 1 + .../streamclient/client_helpers.go | 10 ++-- .../streamingccl/streamclient/client_test.go | 8 +-- .../streamclient/random_stream_client.go | 23 ++++---- .../streamingest/merged_subscription_test.go | 10 ++-- .../stream_ingestion_processor.go | 53 ++++++++++--------- .../stream_ingestion_processor_test.go | 34 ++++++------ .../streamproducer/replication_stream_test.go | 2 +- 11 files changed, 103 insertions(+), 90 deletions(-) diff --git a/pkg/ccl/cmdccl/clusterrepl/main.go b/pkg/ccl/cmdccl/clusterrepl/main.go index 95fb15e06da6..287b658764ae 100644 --- a/pkg/ccl/cmdccl/clusterrepl/main.go +++ b/pkg/ccl/cmdccl/clusterrepl/main.go @@ -246,8 +246,10 @@ func subscriptionConsumer( } switch event.Type() { case streamingccl.KVEvent: - kv := event.GetKV() - sz = kv.Size() + sz = 0 + for _, kv := range event.GetKVs() { + sz += kv.Size() + } case streamingccl.SSTableEvent: ssTab := event.GetSSTable() sz = ssTab.Size() diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index 909862233cdf..c931709e755f 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -46,8 +46,8 @@ type Event interface { // Type specifies which accessor will be meaningful. Type() EventType - // GetKV returns a KV event if the EventType is KVEvent. - GetKV() *roachpb.KeyValue + // GetKVs returns a KV event if the EventType is KVEvent. + GetKVs() []roachpb.KeyValue // GetSSTable returns a AddSSTable event if the EventType is SSTableEvent. GetSSTable() *kvpb.RangeFeedSSTable @@ -68,7 +68,7 @@ type Event interface { // kvEvent is a key value pair that needs to be ingested. type kvEvent struct { - kv roachpb.KeyValue + kv []roachpb.KeyValue } var _ Event = kvEvent{} @@ -78,9 +78,9 @@ func (kve kvEvent) Type() EventType { return KVEvent } -// GetKV implements the Event interface. -func (kve kvEvent) GetKV() *roachpb.KeyValue { - return &kve.kv +// GetKVs implements the Event interface. +func (kve kvEvent) GetKVs() []roachpb.KeyValue { + return kve.kv } // GetSSTable implements the Event interface. @@ -118,8 +118,8 @@ func (sste sstableEvent) Type() EventType { return SSTableEvent } -// GetKV implements the Event interface. -func (sste sstableEvent) GetKV() *roachpb.KeyValue { +// GetKVs implements the Event interface. +func (sste sstableEvent) GetKVs() []roachpb.KeyValue { return nil } @@ -160,8 +160,8 @@ func (dre delRangeEvent) Type() EventType { return DeleteRangeEvent } -// GetKV implements the Event interface. -func (dre delRangeEvent) GetKV() *roachpb.KeyValue { +// GetKVs implements the Event interface. +func (dre delRangeEvent) GetKVs() []roachpb.KeyValue { return nil } @@ -205,8 +205,8 @@ func (ce checkpointEvent) Type() EventType { return CheckpointEvent } -// GetKV implements the Event interface. -func (ce checkpointEvent) GetKV() *roachpb.KeyValue { +// GetKVs implements the Event interface. +func (ce checkpointEvent) GetKVs() []roachpb.KeyValue { return nil } @@ -246,8 +246,8 @@ func (spe spanConfigEvent) Type() EventType { return SpanConfigEvent } -// GetKV implements the Event interface. -func (spe spanConfigEvent) GetKV() *roachpb.KeyValue { +// GetKVs implements the Event interface. +func (spe spanConfigEvent) GetKVs() []roachpb.KeyValue { return nil } @@ -288,7 +288,7 @@ func (se splitEvent) Type() EventType { } // GetKV implements the Event interface. -func (se splitEvent) GetKV() *roachpb.KeyValue { +func (se splitEvent) GetKVs() []roachpb.KeyValue { return nil } @@ -318,7 +318,7 @@ func (se splitEvent) GetSplitEvent() *roachpb.Key { } // MakeKVEvent creates an Event from a KV. -func MakeKVEvent(kv roachpb.KeyValue) Event { +func MakeKVEvent(kv []roachpb.KeyValue) Event { return kvEvent{kv: kv} } diff --git a/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go b/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go index c7a445c4b009..d23d3ee0d1d6 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go +++ b/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go @@ -41,13 +41,18 @@ type FeedEventPredicate func(message streamingccl.Event) bool // FeedErrorPredicate allows tests to match an error from ReplicationFeed. type FeedErrorPredicate func(err error) bool -// KeyMatches makes a FeedEventPredicate that matches a given key. +// KeyMatches makes a FeedEventPredicate that matches a given key in a kv batch. func KeyMatches(key roachpb.Key) FeedEventPredicate { return func(msg streamingccl.Event) bool { if msg.Type() != streamingccl.KVEvent { return false } - return bytes.Equal(key, msg.GetKV().Key) + for _, kv := range msg.GetKVs() { + if bytes.Equal(key, kv.Key) { + return true + } + } + return false } } @@ -107,14 +112,15 @@ func MakeReplicationFeed(t *testing.T, f FeedSource) *ReplicationFeed { } } -// ObserveKey consumes the feed until requested key has been seen (or deadline expired). +// ObserveKey consumes the feed until requested key has been seen (or deadline +// expired) in a batch (all of which, including subsequent keys, is consumed). // Note: we don't do any buffering here. Therefore, it is required that the key // we want to observe will arrive at some point in the future. func (rf *ReplicationFeed) ObserveKey(ctx context.Context, key roachpb.Key) roachpb.KeyValue { rf.consumeUntil(ctx, KeyMatches(key), func(err error) bool { return false }) - return *rf.msg.GetKV() + return rf.msg.GetKVs()[0] } // ObserveAnySpanConfigRecord consumes the feed until any span config record is observed. diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 430936593a59..2d7ae0938e80 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", "//pkg/sql/sem/tree", + "//pkg/util/bufalloc", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", diff --git a/pkg/ccl/streamingccl/streamclient/client_helpers.go b/pkg/ccl/streamingccl/streamclient/client_helpers.go index 4f5fb194f02e..e2665000f054 100644 --- a/pkg/ccl/streamingccl/streamclient/client_helpers.go +++ b/pkg/ccl/streamingccl/streamclient/client_helpers.go @@ -18,7 +18,7 @@ import ( ) func subscribeInternal( - ctx context.Context, feed pgx.Rows, eventsChan chan streamingccl.Event, closeChan chan struct{}, + ctx context.Context, feed pgx.Rows, eventCh chan streamingccl.Event, closeCh chan struct{}, ) error { // Get the next event from the cursor. var bufferedEvent *streampb.StreamEvent @@ -51,8 +51,8 @@ func subscribeInternal( return err } select { - case eventsChan <- event: - case <-closeChan: + case eventCh <- event: + case <-closeCh: // Exit quietly to not cause other subscriptions in the same // ctxgroup.Group to exit. return nil @@ -81,8 +81,8 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event { event = streamingccl.MakeSSTableEvent(streamEvent.Batch.Ssts[0]) streamEvent.Batch.Ssts = streamEvent.Batch.Ssts[1:] case len(streamEvent.Batch.KeyValues) > 0: - event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues[0]) - streamEvent.Batch.KeyValues = streamEvent.Batch.KeyValues[1:] + event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues) + streamEvent.Batch.KeyValues = nil case len(streamEvent.Batch.DelRanges) > 0: event = streamingccl.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0]) streamEvent.Batch.DelRanges = streamEvent.Batch.DelRanges[1:] diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 56ea3ac39e41..f2d30168a14f 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -102,7 +102,7 @@ func (sc testStreamClient) Subscribe( } events := make(chan streamingccl.Event, 2) - events <- streamingccl.MakeKVEvent(sampleKV) + events <- streamingccl.MakeKVEvent([]roachpb.KeyValue{sampleKV}) events <- streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{sampleResolvedSpan}) close(events) @@ -303,8 +303,10 @@ func ExampleClient() { for event := range sub.Events() { switch event.Type() { case streamingccl.KVEvent: - kv := event.GetKV() - fmt.Printf("kv: %s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime) + kvs := event.GetKVs() + for _, kv := range kvs { + fmt.Printf("kv: %s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime) + } case streamingccl.SSTableEvent: sst := event.GetSSTable() fmt.Printf("sst: %s->%s@%d\n", sst.Span.String(), string(sst.Data), sst.WriteTS.WallTime) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 0d2be650d0ae..e4970ba2edac 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -280,7 +281,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { if len(r.systemKVs) > 0 { systemKV := r.systemKVs[0] systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - event = streamingccl.MakeKVEvent(systemKV) + event = streamingccl.MakeKVEvent([]roachpb.KeyValue{systemKV}) r.systemKVs = r.systemKVs[1:] return event } @@ -295,7 +296,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { } event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals)) } else { - event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)) + event = streamingccl.MakeKVEvent([]roachpb.KeyValue{makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)}) } r.numEventsSinceLastResolved++ } @@ -674,17 +675,15 @@ func duplicateEvent(event streamingccl.Event) streamingccl.Event { copy(resolvedSpans, event.GetResolvedSpans()) dup = streamingccl.MakeCheckpointEvent(resolvedSpans) case streamingccl.KVEvent: - eventKV := event.GetKV() - rawBytes := make([]byte, len(eventKV.Value.RawBytes)) - copy(rawBytes, eventKV.Value.RawBytes) - keyVal := roachpb.KeyValue{ - Key: event.GetKV().Key.Clone(), - Value: roachpb.Value{ - RawBytes: rawBytes, - Timestamp: eventKV.Value.Timestamp, - }, + kvs := event.GetKVs() + res := make([]roachpb.KeyValue, len(kvs)) + var a bufalloc.ByteAllocator + for i := range kvs { + res[i].Key = kvs[i].Key.Clone() + res[i].Value.Timestamp = kvs[i].Value.Timestamp + a, res[i].Value.RawBytes = a.Copy(kvs[i].Value.RawBytes, 0) } - dup = streamingccl.MakeKVEvent(keyVal) + dup = streamingccl.MakeKVEvent(res) case streamingccl.SSTableEvent: sst := event.GetSSTable() dataCopy := make([]byte, len(sst.Data)) diff --git a/pkg/ccl/streamingccl/streamingest/merged_subscription_test.go b/pkg/ccl/streamingccl/streamingest/merged_subscription_test.go index 4f5d3b265361..fb0a7fd726be 100644 --- a/pkg/ccl/streamingccl/streamingest/merged_subscription_test.go +++ b/pkg/ccl/streamingccl/streamingest/merged_subscription_test.go @@ -30,12 +30,12 @@ func TestMergeSubscriptionsRun(t *testing.T) { ctx := context.Background() events := func(partition string) []streamingccl.Event { return []streamingccl.Event{ - streamingccl.MakeKVEvent(roachpb.KeyValue{ + streamingccl.MakeKVEvent([]roachpb.KeyValue{{ Key: []byte(partition + "_key1"), - }), - streamingccl.MakeKVEvent(roachpb.KeyValue{ + }}), + streamingccl.MakeKVEvent([]roachpb.KeyValue{{ Key: []byte(partition + "_key2"), - }), + }}), } } mockClient := &mockStreamClient{ @@ -67,7 +67,7 @@ func TestMergeSubscriptionsRun(t *testing.T) { events := []string{} g.Go(func() error { for ev := range merged.Events() { - events = append(events, string(ev.GetKV().Key)) + events = append(events, string(ev.GetKVs()[0].Key)) } return nil }) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index ef278496c3d7..6bf0ab15e3a0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -707,7 +707,7 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error { if event.Type() == streamingccl.KVEvent { sip.metrics.AdmitLatency.RecordValue( - timeutil.Since(event.GetKV().Value.Timestamp.GoTime()).Nanoseconds()) + timeutil.Since(event.GetKVs()[0].Value.Timestamp.GoTime()).Nanoseconds()) } if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { @@ -720,7 +720,7 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error { switch event.Type() { case streamingccl.KVEvent: - if err := sip.bufferKV(event.GetKV()); err != nil { + if err := sip.bufferKVs(event.GetKVs()); err != nil { return err } case streamingccl.SSTableEvent: @@ -792,13 +792,13 @@ func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error return err } - return sip.bufferKV(&roachpb.KeyValue{ + return sip.bufferKVs([]roachpb.KeyValue{{ Key: keyVal.Key.Key, Value: roachpb.Value{ RawBytes: mvccValue.RawBytes, Timestamp: keyVal.Key.Timestamp, }, - }) + }}) }, func(rangeKeyVal storage.MVCCRangeKeyValue) error { return sip.bufferRangeKeyVal(rangeKeyVal) }) @@ -870,36 +870,37 @@ func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error { return kvDB.AdminSplit(ctx, rekey, expiration) } -func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error { +func (sip *streamIngestionProcessor) bufferKVs(kvs []roachpb.KeyValue) error { // TODO: In addition to flushing when receiving a checkpoint event, we // should also flush when we've buffered sufficient KVs. A buffering adder // would save us here. - if kv == nil { + if kvs == nil { return errors.New("kv event expected to have kv") } + for _, kv := range kvs { + var err error + var ok bool + kv.Key, ok, err = sip.rekey(kv.Key) + if err != nil { + return err + } + if !ok { + continue + } - var err error - var ok bool - kv.Key, ok, err = sip.rekey(kv.Key) - if err != nil { - return err - } - if !ok { - return nil - } + if sip.rewriteToDiffKey { + kv.Value.ClearChecksum() + kv.Value.InitChecksum(kv.Key) + } - if sip.rewriteToDiffKey { - kv.Value.ClearChecksum() - kv.Value.InitChecksum(kv.Key) + sip.buffer.addKV(storage.MVCCKeyValue{ + Key: storage.MVCCKey{ + Key: kv.Key, + Timestamp: kv.Value.Timestamp, + }, + Value: kv.Value.RawBytes, + }) } - - sip.buffer.addKV(storage.MVCCKeyValue{ - Key: storage.MVCCKey{ - Key: kv.Key, - Timestamp: kv.Value.Timestamp, - }, - Value: kv.Value.RawBytes, - }) return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 602870553f27..b78230dfd119 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -224,13 +224,13 @@ func TestStreamIngestionProcessor(t *testing.T) { p2Key := roachpb.Key("key_2") p2Span := roachpb.Span{Key: p2Key, EndKey: p2Key.Next()} - sampleKV := func() roachpb.KeyValue { + sampleKV := func() []roachpb.KeyValue { key, err := keys.RewriteKeyToTenantPrefix(p1Key, keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID))) require.NoError(t, err) v := roachpb.MakeValueFromString("value_1") v.Timestamp = hlc.Timestamp{WallTime: 1} - return roachpb.KeyValue{Key: key, Value: v} + return []roachpb.KeyValue{{Key: key, Value: v}} } sampleCheckpoint := func(span roachpb.Span, ts int64) []jobspb.ResolvedSpan { return []jobspb.ResolvedSpan{{Span: span, Timestamp: hlc.Timestamp{WallTime: ts}}} @@ -928,21 +928,23 @@ func resolvedSpansMinTS(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp { } func noteKeyVal( - validator *streamClientValidator, keyVal roachpb.KeyValue, spec streamclient.SubscriptionToken, + validator *streamClientValidator, keyVals []roachpb.KeyValue, spec streamclient.SubscriptionToken, ) { - if validator.rekeyer != nil { - rekey, _, err := validator.rekeyer.RewriteTenant(keyVal.Key) + for _, keyVal := range keyVals { + if validator.rekeyer != nil { + rekey, _, err := validator.rekeyer.RewriteTenant(keyVal.Key) + if err != nil { + panic(err.Error()) + } + keyVal.Key = rekey + keyVal.Value.ClearChecksum() + keyVal.Value.InitChecksum(keyVal.Key) + } + err := validator.noteRow(string(spec), string(keyVal.Key), string(keyVal.Value.RawBytes), + keyVal.Value.Timestamp) if err != nil { panic(err.Error()) } - keyVal.Key = rekey - keyVal.Value.ClearChecksum() - keyVal.Value.InitChecksum(keyVal.Key) - } - err := validator.noteRow(string(spec), string(keyVal.Key), string(keyVal.Value.RawBytes), - keyVal.Value.Timestamp) - if err != nil { - panic(err.Error()) } } @@ -960,16 +962,16 @@ func validateFnWithValidator( case streamingccl.SSTableEvent: kvs := storageutils.ScanSST(t, event.GetSSTable().Data) for _, keyVal := range kvs.MVCCKeyValues() { - noteKeyVal(validator, roachpb.KeyValue{ + noteKeyVal(validator, []roachpb.KeyValue{{ Key: keyVal.Key.Key, Value: roachpb.Value{ RawBytes: keyVal.Value, Timestamp: keyVal.Key.Timestamp, }, - }, spec) + }}, spec) } case streamingccl.KVEvent: - noteKeyVal(validator, *event.GetKV(), spec) + noteKeyVal(validator, event.GetKVs(), spec) case streamingccl.DeleteRangeEvent: panic(errors.New("unsupported event type")) } diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 5484e21dbba6..63e08f65e69a 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -115,7 +115,7 @@ func (d *partitionStreamDecoder) pop() streamingccl.Event { } if d.e.Batch != nil { - event := streamingccl.MakeKVEvent(d.e.Batch.KeyValues[0]) + event := streamingccl.MakeKVEvent([]roachpb.KeyValue{d.e.Batch.KeyValues[0]}) d.e.Batch.KeyValues = d.e.Batch.KeyValues[1:] if len(d.e.Batch.KeyValues) == 0 { d.e.Batch = nil