Skip to content

Commit

Permalink
roachtest: snapshot ingest roachtest improvements
Browse files Browse the repository at this point in the history
This patch contains some small improvements to better test the bandwidth
subtest of the snapshot ingest roachtest.

Informs cockroachdb#86857

Release note: None
  • Loading branch information
aadityasondhi committed Nov 13, 2024
1 parent 42d0aa6 commit f99f117
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions pkg/cmd/roachtest/tests/admission_control_snapshot_overload_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func registerSnapshotOverloadIO(r registry.Registry) {
limitDiskBandwidth: false,
readPercent: 75,
workloadBlockBytes: 12288,
rebalanceRate: "256MiB",
}))

// This tests the behaviour of snpashot ingestion in bandwidth constrained
Expand All @@ -80,6 +81,7 @@ func registerSnapshotOverloadIO(r registry.Registry) {
limitDiskBandwidth: true,
readPercent: 20,
workloadBlockBytes: 1024,
rebalanceRate: "1GiB",
}))

}
Expand All @@ -90,6 +92,7 @@ type admissionControlSnapshotOverloadIOOpts struct {
limitDiskBandwidth bool
readPercent int
workloadBlockBytes int
rebalanceRate string
}

func runAdmissionControlSnapshotOverloadIO(
Expand Down Expand Up @@ -137,9 +140,9 @@ func runAdmissionControlSnapshotOverloadIO(
t.Fatalf("failed to set storage.ingest_split.enabled: %v", err)
}

// Set a high rebalance rate.
// Set rebalance rate.
if _, err := db.ExecContext(
ctx, "SET CLUSTER SETTING kv.snapshot_rebalance.max_rate = '256MiB'"); err != nil {
ctx, fmt.Sprintf("SET CLUSTER SETTING kv.snapshot_rebalance.max_rate = '%s'", cfg.rebalanceRate)); err != nil {
t.Fatalf("failed to set kv.snapshot_rebalance.max_rate: %v", err)
}
}
Expand Down Expand Up @@ -254,50 +257,47 @@ func runAdmissionControlSnapshotOverloadIO(
return float64(fromVec[0].Value), nil
}

// TODO(aaditya): assert on disk bandwidth subtest once integrated.
if !cfg.limitDiskBandwidth {
// Assert on l0 sublevel count and p99 latencies.
latencyMetric := divQuery("histogram_quantile(0.99, sum by(le) (rate(sql_service_latency_bucket[2m])))", 1<<20 /* 1ms */)
const latencyThreshold = 100 // 100ms since the metric is scaled to 1ms above.
const sublevelMetric = "storage_l0_sublevels"
const sublevelThreshold = 20
var l0SublevelCount []float64
const sampleCountForL0Sublevel = 12
const collectionIntervalSeconds = 10.0
// Loop for ~120 minutes.
const numIterations = int(120 / (collectionIntervalSeconds / 60))
numErrors := 0
numSuccesses := 0
for i := 0; i < numIterations; i++ {
time.Sleep(collectionIntervalSeconds * time.Second)
val, err := getHistMetricVal(latencyMetric)
if err != nil {
numErrors++
continue
}
if val > latencyThreshold {
t.Fatalf("sql p99 latency %f exceeded threshold", val)
}
val, err = getMetricVal(sublevelMetric, "store")
if err != nil {
numErrors++
continue
}
l0SublevelCount = append(l0SublevelCount, val)
// We want to use the mean of the last 2m of data to avoid short-lived
// spikes causing failures.
if len(l0SublevelCount) >= sampleCountForL0Sublevel {
latestSampleMeanL0Sublevels := roachtestutil.GetMeanOverLastN(sampleCountForL0Sublevel, l0SublevelCount)
if latestSampleMeanL0Sublevels > sublevelThreshold {
t.Fatalf("sub-level mean %f over last %d iterations exceeded threshold", latestSampleMeanL0Sublevels, sampleCountForL0Sublevel)
}
}
numSuccesses++
// Assert on l0 sublevel count and p99 latencies.
latencyMetric := divQuery("histogram_quantile(0.99, sum by(le) (rate(sql_service_latency_bucket[2m])))", 1<<20 /* 1ms */)
const latencyThreshold = 100 // 100ms since the metric is scaled to 1ms above.
const sublevelMetric = "storage_l0_sublevels"
const sublevelThreshold = 20
var l0SublevelCount []float64
const sampleCountForL0Sublevel = 12
const collectionIntervalSeconds = 10.0
// Loop for ~120 minutes.
const numIterations = int(120 / (collectionIntervalSeconds / 60))
numErrors := 0
numSuccesses := 0
for i := 0; i < numIterations; i++ {
time.Sleep(collectionIntervalSeconds * time.Second)
val, err := getHistMetricVal(latencyMetric)
if err != nil {
numErrors++
continue
}
t.Status(fmt.Sprintf("done monitoring, errors: %d successes: %d", numErrors, numSuccesses))
if numErrors > numSuccesses {
t.Fatalf("too many errors retrieving metrics")
if val > latencyThreshold {
t.Fatalf("sql p99 latency %f exceeded threshold", val)
}
val, err = getMetricVal(sublevelMetric, "store")
if err != nil {
numErrors++
continue
}
l0SublevelCount = append(l0SublevelCount, val)
// We want to use the mean of the last 2m of data to avoid short-lived
// spikes causing failures.
if len(l0SublevelCount) >= sampleCountForL0Sublevel {
latestSampleMeanL0Sublevels := roachtestutil.GetMeanOverLastN(sampleCountForL0Sublevel, l0SublevelCount)
if latestSampleMeanL0Sublevels > sublevelThreshold {
t.Fatalf("sub-level mean %f over last %d iterations exceeded threshold", latestSampleMeanL0Sublevels, sampleCountForL0Sublevel)
}
}
numSuccesses++
}
t.Status(fmt.Sprintf("done monitoring, errors: %d successes: %d", numErrors, numSuccesses))
if numErrors > numSuccesses {
t.Fatalf("too many errors retrieving metrics")
}
return nil
})
Expand Down

0 comments on commit f99f117

Please sign in to comment.