Skip to content

Commit

Permalink
changefeedccl: add rebalancing checks
Browse files Browse the repository at this point in the history
This change adds extra test coverage for partition rebalancing in
changefeeds. It adds checks which are performed after rebalancing
to assert that the output list of spans covers exactly the same keys
as the input list of spans. These checks are expensive so they only
run if the environment variable `COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS`
is true. This variable is true in cdc roachtests and unit tests.

Release note: None
Epic: None
  • Loading branch information
jayshrivastava committed Mar 18, 2024
1 parent ab3298b commit 8fee491
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
59 changes: 56 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -392,7 +394,7 @@ func makePlan(
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "spans returned by DistSQL: %s", spanPartitions)
log.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
}
switch {
case distMode == sql.LocalDistribution || rangeDistribution == int64(defaultDistribution):
Expand All @@ -409,7 +411,7 @@ func makePlan(
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "spans after balanced simple distribution rebalancing: %s", spanPartitions)
log.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
}
default:
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
Expand Down Expand Up @@ -442,7 +444,7 @@ func makePlan(
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
for i, sp := range spanPartitions {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "watched spans for node %d: %s", sp.SQLInstanceID, sp)
log.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
}
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
Expand Down Expand Up @@ -584,6 +586,12 @@ type rebalancingPartition struct {
pIdx int
}

// Setting expensiveReblanceChecksEnabled = true will cause re-balancing to
// panic if the output list of partitions does not cover the same keys as the
// input list of partitions.
var expensiveReblanceChecksEnabled = buildutil.CrdbTestBuild || envutil.EnvOrDefaultBool(
"COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS", false)

func rebalanceSpanPartitions(
ctx context.Context, ri rangeIterator, sensitivity float64, partitions []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
Expand Down Expand Up @@ -682,5 +690,50 @@ func rebalanceSpanPartitions(
partitions[b.pIdx] = sql.MakeSpanPartitionWithRangeCount(
b.part.SQLInstanceID, b.group.Slice(), b.numRanges)
}

if err := verifyPartitionsIfExpensiveChecksEnabled(builders, partitions, targetRanges); err != nil {
return nil, err
}

return partitions, nil
}

// verifyPartitionsIfExpensiveChecksEnabled panics if the output partitions
// cover a different set of keys than the input partitions.
func verifyPartitionsIfExpensiveChecksEnabled(
builderWithInputSpans []rebalancingPartition,
outputPartitions []sql.SpanPartition,
targetRanges int,
) error {
if !expensiveReblanceChecksEnabled {
return nil
}
var originalSpansG roachpb.SpanGroup
var originalSpansArr []roachpb.Span
var newSpansG roachpb.SpanGroup
var newSpansArr []roachpb.Span
for _, b := range builderWithInputSpans {
originalSpansG.Add(b.part.Spans...)
originalSpansArr = append(originalSpansArr, b.part.Spans...)
}
for _, p := range outputPartitions {
if numRanges, ok := p.NumRanges(); !ok {
return changefeedbase.WithTerminalError(
errors.Newf("partition missing number of ranges info, partition: %v, partitions: %v", p, outputPartitions))
} else if numRanges > targetRanges {
return changefeedbase.WithTerminalError(
errors.Newf("found partition with too many ranges, target: %d, partition: %v, partitions: %v",
targetRanges, p, outputPartitions))
}

newSpansG.Add(p.Spans...)
newSpansArr = append(newSpansArr, p.Spans...)
}
// If the original spans enclose the new spans and the new spans enclose the original spans,
// then the two groups must cover exactly the same keys.
if !originalSpansG.Encloses(newSpansArr...) || !newSpansG.Encloses(originalSpansArr...) {
return changefeedbase.WithTerminalError(errors.Newf("incorrect rebalance. input spans: %v, output spans: %v",
originalSpansArr, newSpansArr))
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ var envVars = []string{
"COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true",
"COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_TEST_METADATA=true",
"COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_READER_METADATA=true",
// Enable strict re-balancing checks to ensure that rebalancing doesn't create an
// incorrect set of spans for the changefeed.
"COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS=true",
}

type cdcTester struct {
Expand Down

0 comments on commit 8fee491

Please sign in to comment.