Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115785: changefeedccl: add parallel io metrics r=jayshrivastava a=jayshrivastava

This change adds metrics to improve the observability of contention in
the batching webhook and pubsub sinks.

Previously, there were no metrics to count the number of pending and in-flight keys,
so this change adds `changefeed.parallel_io_pending_keys` and
`changefeed.parallel_io_result_queue_nanos` to measure them respectively.

This change also adds `changefeed.parallel_io_result_queue_nanos` to measure
the time which results from the sink must wait in a queue before the changefeed
acks them.

Release note: None


116286: kv,admission: add missing admission headers r=sumeerbhola a=aadityasondhi

This patch adds missing admission headers in a couple of places: `txn_interceptor_heartbeater` and `replica_range_lease`. Batch requests coming from these code paths are expected to bypass AC. Empty admission headers were accomplishing the same goal but this makes it more clear at what the intention is.

Informs #112680

Release note: None

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
Co-authored-by: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com>
  • Loading branch information
3 people committed Dec 13, 2023
3 parents 366b81d + b90e6fe + dbe5954 commit 52329bc
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 100 deletions.
5 changes: 4 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,10 @@
<tr><td>APPLICATION</td><td>changefeed.nprocs_consume_event_nanos</td><td>Total time spent waiting to add an event to the parallel consumer</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.nprocs_flush_nanos</td><td>Total time spent idle waiting for the parallel consumer to flush</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.nprocs_in_flight_count</td><td>Number of buffered events in the parallel consumer</td><td>Count of Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_queue_nanos</td><td>Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_in_flight_keys</td><td>The number of keys currently in-flight which may contend with batches pending to be emitted</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_pending_rows</td><td>Number of rows which are blocked from being sent due to conflicting in-flight keys</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_queue_nanos</td><td>Time that outgoing requests to the sink spend waiting in a queue due to in-flight requests with conflicting keys</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_result_queue_nanos</td><td>Time that incoming results from the sink spend waiting in parallel io emitter before they are acknowledged by the changefeed</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.queue_time_nanos</td><td>Time KV event spent waiting to be processed</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.running</td><td>Number of currently running changefeeds, including sinkless</td><td>Changefeeds</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.schema_registry.registrations</td><td>Number of registration attempts with the schema registry</td><td>Registrations</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ func (sb *sinkBatch) Keys() intsets.Fast {
return sb.keys
}

// NumMessages implements the IORequest interface.
func (sb *sinkBatch) NumMessages() int {
return sb.numMessages
}

func (sb *sinkBatch) isEmpty() bool {
return sb.numMessages == 0
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9123,3 +9123,86 @@ func TestBatchSizeMetric(t *testing.T) {
}
cdcTest(t, testFn)
}

// TestParallelIOMetrics tests parallel io metrics.
func TestParallelIOMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics

// Add delay so queuing occurs, which results in the below metrics being
// nonzero.
defer testingEnableQueuingDelay()()

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, `SET CLUSTER SETTING changefeed.new_pubsub_sink_enabled = true`)
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
db.Exec(t, `
CREATE TABLE foo (a INT PRIMARY KEY);
`)

// Keep writing data to the same key to ensure contention.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

g := ctxgroup.WithContext(ctx)
done := make(chan struct{})
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-done:
return nil
default:
_, err := s.DB.Exec(`UPSERT INTO foo (a) SELECT * FROM generate_series(1, 10)`)
if err != nil {
return err
}
}
}
})
// Set the frequency to 1s. The default frequency at the time of writing is
foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config=" +
"'{\"Flush\": {\"Frequency\": \"100ms\"}}'")
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
numSamples, sum := metrics.ParallelIOPendingQueueNanos.TotalWindowed()
if numSamples <= 0 && sum <= 0.0 {
return errors.Newf("waiting for queue nanos: %d %f", numSamples, sum)
}
return nil
})
testutils.SucceedsSoon(t, func() error {
pendingKeys := metrics.ParallelIOPendingRows.Value()
if pendingKeys <= 0 {
return errors.Newf("waiting for pending keys: %d", pendingKeys)
}
return nil
})
testutils.SucceedsSoon(t, func() error {
for i := 0; i < 50; i++ {
inFlightKeys := metrics.ParallelIOInFlightKeys.Value()
if inFlightKeys > 0 {
return nil
}
}
return errors.New("waiting for in-flight keys")
})
testutils.SucceedsSoon(t, func() error {
numSamples, sum := metrics.ParallelIOResultQueueNanos.TotalWindowed()
if numSamples <= 0 && sum <= 0.0 {
return errors.Newf("waiting for result queue nanos: %d %f", numSamples, sum)
}
return nil
})
close(done)
require.NoError(t, g.Wait())
require.NoError(t, foo.Close())
}
cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
Loading

0 comments on commit 52329bc

Please sign in to comment.