Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115375: changefeedccl: reduce rebalancing memory usage from O(ranges) to O(spans) r=jayshrivastava a=jayshrivastava

### sql: count ranges per partition in PartitionSpans

This change updates span partitioning to count ranges while making
partitions. This allows callers to rebalance partitions based on
range counts without having to iterate over the spans to count
ranges.

Release note: None
Epic: None

### changefeedccl: reduce rebalancing memory usage from O(ranges) to O(spans) #115375

Previously, the `rebalanceSpanPartitions` would use O(ranges) memory. This change
rewrites it to use range iterators, reducing the memory usage to O(spans).

This change also adds a randomized test to assert that all spans are accounted for after
rebalancing. It also adds one more unit test.

Informs: #113898
Epic: None

### changefeedccl: add rebalancing checks

This change adds extra test coverage for partition rebalancing in
changefeeds. It adds checks which are performed after rebalancing
to assert that the output list of spans covers exactly the same keys
as the input list of spans. These checks are expensive so they only
run if the environment variable `COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS`
is true. This variable is true in cdc roachtests and unit tests.

Release note: None
Epic: None

119885: storage: support per-store IO metrics with fine granularity r=jbowens,abarganier a=CheranMahalingam

Currently, timeseries metrics are collected on a 10s interval which hides momentary spikes in IO. This commit introduces a central disk monitoring system that polls for disk stats at a 100ms interval. Additionally, the current system accumulates disk metrics across all block devices which includes noise from unrelated processes. This commit also adds support for exporting per-store IO metrics (i.e. IO stats for block devices that map to stores used by Cockroach).

These changes will be followed up by a PR to remove the need for customers to specify disk names when setting the provisioned bandwidth for each store as described in #109350.

Fixes: #104114, #112898.
Informs: #89786.

Epic: None.
Release note: None.

120649: changefeedccl: avoid undefined behavior in distribution test r=wenyihu6 a=jayshrivastava

The `rangeDistributionTester` would sometimes calculate log(0) when determining the node to move a range too. Most of the time, this would be some garbage value which gets ignored. Sometimes, this may return a valid node id, causing the range distribution to be wrong and failing the test failures. This change updates the tester to handle this edge case.

Closes: #120470
Release note: None

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
Co-authored-by: Cheran Mahalingam <cheran.mahalingam@cockroachlabs.com>
  • Loading branch information
3 people committed Mar 18, 2024
4 parents 91c1d5c + 8fee491 + f33445b + fc4a2c2 commit 4deb9e3
Show file tree
Hide file tree
Showing 33 changed files with 1,751 additions and 140 deletions.
11 changes: 10 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,15 @@
<tr><td>STORAGE</td><td>storage.compactions.keys.pinned.count</td><td>Cumulative count of storage engine KVs written to sstables during flushes and compactions due to open LSM snapshots.<br/><br/>Various subsystems of CockroachDB take LSM snapshots to maintain a consistent view<br/>of the database over an extended duration. In order to maintain the consistent view,<br/>flushes and compactions within the storage engine must preserve keys that otherwise<br/>would have been dropped. This increases write amplification, and introduces keys<br/>that must be skipped during iteration. This metric records the cumulative count of<br/>KVs preserved during flushes and compactions over the lifetime of the process.<br/></td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk-slow</td><td>Number of instances of disk operations taking longer than 10s</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk-stalled</td><td>Number of instances of disk operations taking longer than 20s</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.io.time</td><td>Time spent reading from or writing to the store&#39;s disk since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.iopsinprogress</td><td>IO operations currently in progress on the store&#39;s disk (as reported by the OS)</td><td>Operations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.read.bytes</td><td>Bytes read from the store&#39;s disk since this process started (as reported by the OS)</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.read.count</td><td>Disk read operations on the store&#39;s disk since this process started (as reported by the OS)</td><td>Operations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.read.time</td><td>Time spent reading from the store&#39;s disk since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.weightedio.time</td><td>Weighted time spent reading from or writing to the store&#39;s disk since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.write.bytes</td><td>Bytes written to the store&#39;s disk since this process started (as reported by the OS)</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.write.count</td><td>Disk write operations on the store&#39;s disk since this process started (as reported by the OS)</td><td>Operations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.disk.write.time</td><td>Time spent writing to the store&#39;s disks since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.flush.ingest.count</td><td>Flushes performing an ingest (flushable ingestions)</td><td>Flushes</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.flush.ingest.table.bytes</td><td>Bytes ingested via flushes (flushable ingestions)</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.flush.ingest.table.count</td><td>Tables ingested via flushes (flushable ingestions)</td><td>Tables</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down Expand Up @@ -1595,7 +1604,7 @@
<tr><td>SERVER</td><td>sys.host.disk.read.bytes</td><td>Bytes read from all disks since this process started (as reported by the OS)</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.read.count</td><td>Disk read operations across all disks since this process started (as reported by the OS)</td><td>Operations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.read.time</td><td>Time spent reading from all disks since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.weightedio.time</td><td>Weighted time spent reading from or writing to to all disks since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.weightedio.time</td><td>Weighted time spent reading from or writing to all disks since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.write.bytes</td><td>Bytes written to all disks since this process started (as reported by the OS)</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.write.count</td><td>Disk write operations across all disks since this process started (as reported by the OS)</td><td>Operations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>SERVER</td><td>sys.host.disk.write.time</td><td>Time spent writing to all disks since this process started (as reported by the OS)</td><td>Time</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ ALL_TESTS = [
"//pkg/sql/types:types_test",
"//pkg/sql:sql_disallowed_imports_test",
"//pkg/sql:sql_test",
"//pkg/storage/disk:disk_test",
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs_test",
"//pkg/storage/metamorphic:metamorphic_test",
Expand Down Expand Up @@ -2225,6 +2226,8 @@ GO_TARGETS = [
"//pkg/sql/vtable:vtable",
"//pkg/sql:sql",
"//pkg/sql:sql_test",
"//pkg/storage/disk:disk",
"//pkg/storage/disk:disk_test",
"//pkg/storage/enginepb:enginepb",
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ go_test(
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
221 changes: 162 additions & 59 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ package changefeedccl

import (
"context"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -35,7 +37,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -390,7 +394,7 @@ func makePlan(
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "spans returned by DistSQL: %s", spanPartitions)
log.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
}
switch {
case distMode == sql.LocalDistribution || rangeDistribution == int64(defaultDistribution):
Expand All @@ -400,14 +404,14 @@ func makePlan(
}
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

ri := kvcoord.MakeRangeIterator(distSender)
spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
ctx, &ri, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "spans after balanced simple distribution rebalancing: %s", spanPartitions)
log.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
}
default:
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
Expand Down Expand Up @@ -440,7 +444,7 @@ func makePlan(
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
for i, sp := range spanPartitions {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "watched spans for node %d: %s", sp.SQLInstanceID, sp)
log.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
}
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
Expand Down Expand Up @@ -549,6 +553,7 @@ func (w *changefeedResultWriter) Err() error {
return w.err
}

// TODO(#120427): improve this to be more useful.
var rebalanceThreshold = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.balance_range_distribution.sensitivity",
Expand All @@ -557,80 +562,178 @@ var rebalanceThreshold = settings.RegisterFloatSetting(
settings.PositiveFloat,
)

type rangeResolver interface {
getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
type rangeIterator interface {
Desc() *roachpb.RangeDescriptor
NeedAnother(rs roachpb.RSpan) bool
Valid() bool
Error() error
Next(ctx context.Context)
Seek(ctx context.Context, key roachpb.RKey, scanDir kvcoord.ScanDirection)
}

type distResolver struct {
*kvcoord.DistSender
// rebalancingPartition is a container used to store a partition undergoing
// rebalancing.
type rebalancingPartition struct {
// These fields store the current number of ranges and spans in this partition.
// They are initialized corresponding to the sql.SpanPartition partition below
// and mutated during rebalancing.
numRanges int
group roachpb.SpanGroup

// The original span partition corresponding to this bucket and its
// index in the original []sql.SpanPartition.
part sql.SpanPartition
pIdx int
}

func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
spans, _, err := r.DistSender.AllRangeSpans(ctx, spans)
return spans, err
}
// Setting expensiveReblanceChecksEnabled = true will cause re-balancing to
// panic if the output list of partitions does not cover the same keys as the
// input list of partitions.
var expensiveReblanceChecksEnabled = buildutil.CrdbTestBuild || envutil.EnvOrDefaultBool(
"COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS", false)

func rebalanceSpanPartitions(
ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
ctx context.Context, ri rangeIterator, sensitivity float64, partitions []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
if len(p) <= 1 {
return p, nil
if len(partitions) <= 1 {
return partitions, nil
}

// Explode set of spans into set of ranges.
// TODO(yevgeniy): This might not be great if the tables are huge.
numRanges := 0
for i := range p {
spans, err := r.getRangesForSpans(ctx, p[i].Spans)
if err != nil {
return nil, err
// Create partition builder structs for the partitions array above.
var builders = make([]rebalancingPartition, len(partitions))
var totalRanges int
for i, p := range partitions {
builders[i].part = p
builders[i].pIdx = i
nRanges, ok := p.NumRanges()
// We cannot rebalance if we're missing range information.
if !ok {
log.Warning(ctx, "skipping rebalance due to missing range info")
return partitions, nil
}
p[i].Spans = spans
numRanges += len(spans)
builders[i].numRanges = nRanges
totalRanges += nRanges
builders[i].group.Add(p.Spans...)
}

// Sort descending based on the number of ranges.
sort.Slice(p, func(i, j int) bool {
return len(p[i].Spans) > len(p[j].Spans)
sort.Slice(builders, func(i, j int) bool {
return builders[i].numRanges > builders[j].numRanges
})

targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
from, to := i, j

// Figure out how many ranges we can move.
numToMove := len(p[from].Spans) - targetRanges
canMove := targetRanges - len(p[to].Spans)
if numToMove <= canMove {
i++
}
if canMove <= numToMove {
numToMove = canMove
j--
targetRanges := int(math.Ceil((1 + sensitivity) * float64(totalRanges) / float64(len(partitions))))
to := len(builders) - 1
from := 0

// In each iteration of the outer loop, check if `from` has too many ranges.
// If so, move them to other partitions which need more ranges
// starting from `to` and moving down. Otherwise, increment `from` and check
// again.
for ; from < to && builders[from].numRanges > targetRanges; from++ {
// numToMove is the number of ranges which need to be moved out of `from`
// to other partitions.
numToMove := builders[from].numRanges - targetRanges
count := 0
needMore := func() bool {
return count < numToMove
}
if numToMove == 0 {
break
// Iterate over all the spans in `from`.
for spanIdx := 0; from < to && needMore() && spanIdx < len(builders[from].part.Spans); spanIdx++ {
sp := builders[from].part.Spans[spanIdx]
rSpan, err := keys.SpanAddr(sp)
if err != nil {
return nil, err
}
// Iterate over the ranges in the current span.
for ri.Seek(ctx, rSpan.Key, kvcoord.Ascending); from < to && needMore(); ri.Next(ctx) {
// Error check.
if !ri.Valid() {
return nil, ri.Error()
}

// Move one range from `from` to `to`.
count += 1
builders[from].numRanges -= 1
builders[to].numRanges += 1
// If the range boundaries are outside the original span, trim
// the range.
startKey := ri.Desc().StartKey
if startKey.Compare(rSpan.Key) == -1 {
startKey = rSpan.Key
}
endKey := ri.Desc().EndKey
if endKey.Compare(rSpan.EndKey) == 1 {
endKey = rSpan.EndKey
}
diff := roachpb.Span{
Key: startKey.AsRawKey(), EndKey: endKey.AsRawKey(),
}
builders[from].group.Sub(diff)
builders[to].group.Add(diff)

// Since we moved a range, `to` may have enough ranges.
// Decrement `to` until we find a new partition which needs more
// ranges.
for from < to && builders[to].numRanges >= targetRanges {
to--
}
// No more ranges in this span.
if !ri.NeedAnother(rSpan) {
break
}
}
}
}

// Move numToMove spans from 'from' to 'to'.
idx := len(p[from].Spans) - numToMove
p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
p[from].Spans = p[from].Spans[:idx]
// Overwrite the original partitions slice with the balanced partitions.
for _, b := range builders {
partitions[b.pIdx] = sql.MakeSpanPartitionWithRangeCount(
b.part.SQLInstanceID, b.group.Slice(), b.numRanges)
}

// Collapse ranges into nice set of contiguous spans.
for i := range p {
var g roachpb.SpanGroup
g.Add(p[i].Spans...)
p[i].Spans = g.Slice()
if err := verifyPartitionsIfExpensiveChecksEnabled(builders, partitions, targetRanges); err != nil {
return nil, err
}

// Finally, re-sort based on the node id.
sort.Slice(p, func(i, j int) bool {
return p[i].SQLInstanceID < p[j].SQLInstanceID
})
return p, nil
return partitions, nil
}

// verifyPartitionsIfExpensiveChecksEnabled panics if the output partitions
// cover a different set of keys than the input partitions.
func verifyPartitionsIfExpensiveChecksEnabled(
builderWithInputSpans []rebalancingPartition,
outputPartitions []sql.SpanPartition,
targetRanges int,
) error {
if !expensiveReblanceChecksEnabled {
return nil
}
var originalSpansG roachpb.SpanGroup
var originalSpansArr []roachpb.Span
var newSpansG roachpb.SpanGroup
var newSpansArr []roachpb.Span
for _, b := range builderWithInputSpans {
originalSpansG.Add(b.part.Spans...)
originalSpansArr = append(originalSpansArr, b.part.Spans...)
}
for _, p := range outputPartitions {
if numRanges, ok := p.NumRanges(); !ok {
return changefeedbase.WithTerminalError(
errors.Newf("partition missing number of ranges info, partition: %v, partitions: %v", p, outputPartitions))
} else if numRanges > targetRanges {
return changefeedbase.WithTerminalError(
errors.Newf("found partition with too many ranges, target: %d, partition: %v, partitions: %v",
targetRanges, p, outputPartitions))
}

newSpansG.Add(p.Spans...)
newSpansArr = append(newSpansArr, p.Spans...)
}
// If the original spans enclose the new spans and the new spans enclose the original spans,
// then the two groups must cover exactly the same keys.
if !originalSpansG.Encloses(newSpansArr...) || !newSpansG.Encloses(originalSpansArr...) {
return changefeedbase.WithTerminalError(errors.Newf("incorrect rebalance. input spans: %v, output spans: %v",
originalSpansArr, newSpansArr))
}
return nil
}
Loading

0 comments on commit 4deb9e3

Please sign in to comment.