diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index 967de1812e8..de684a8661d 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -61,7 +61,7 @@ func (t *tableSpan) getAndUpdateTableSpanState() (tablepb.TableState, bool) { log.Debug("schedulerv3: table state changed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), + zap.Any("tableSpan", t.span), zap.Stringer("oldState", oldState), zap.Stringer("state", t.state)) return t.state, true @@ -112,7 +112,7 @@ func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message { log.Warn("schedulerv3: remove table, but table is absent", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID)) + zap.Any("tableSpan", t.span)) t.task = nil return newRemoveTableResponseMessage(t.getTableSpanStatus(false)) case tablepb.TableStateStopping, // stopping now is useless @@ -145,7 +145,7 @@ func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message { log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) + zap.Any("tableSpan", t.span), zap.Stringer("state", state)) } } return nil @@ -164,7 +164,7 @@ func (t *tableSpan) handleAddTableTask( log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), zap.Any("task", t.task), + zap.Any("tableSpan", t.span), zap.Any("task", t.task), zap.Error(err)) status := t.getTableSpanStatus(false) return newAddTableResponseMessage(status), errors.Trace(err) @@ -174,7 +174,7 @@ func (t *tableSpan) handleAddTableTask( log.Info("schedulerv3: table is replicating", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) + zap.Any("tableSpan", t.span), zap.Stringer("state", state)) t.task = nil status := t.getTableSpanStatus(false) return newAddTableResponseMessage(status), nil @@ -184,7 +184,7 @@ func (t *tableSpan) handleAddTableTask( log.Info("schedulerv3: table is prepared", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) + zap.Any("tableSpan", t.span), zap.Stringer("state", state)) t.task = nil return newAddTableResponseMessage(t.getTableSpanStatus(false)), nil } @@ -195,7 +195,7 @@ func (t *tableSpan) handleAddTableTask( log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state), + zap.Any("tableSpan", t.span), zap.Stringer("state", state), zap.Error(err)) status := t.getTableSpanStatus(false) return newAddTableResponseMessage(status), errors.Trace(err) @@ -219,20 +219,20 @@ func (t *tableSpan) handleAddTableTask( log.Info("schedulerv3: add table finished", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) + zap.Any("tableSpan", t.span), zap.Stringer("state", state)) case tablepb.TableStateStopping, tablepb.TableStateStopped: log.Warn("schedulerv3: ignore add table", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID)) + zap.Any("tableSpan", t.span)) t.task = nil return newAddTableResponseMessage(t.getTableSpanStatus(false)), nil default: log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID)) + zap.Any("tableSpan", t.span)) } } @@ -244,14 +244,14 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) { log.Panic("schedulerv3: tableID not match", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), + zap.Any("tableSpan", t.span), zap.Stringer("task.TableID", &task.Span)) } if t.task == nil { log.Info("schedulerv3: table found new task", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), + zap.Any("tableSpan", t.span), zap.Any("task", task)) t.task = task return @@ -260,7 +260,7 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) { "since there is one not finished yet", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.span.TableID), + zap.Any("tableSpan", t.span), zap.Any("nowTask", t.task), zap.Any("ignoredTask", task)) } diff --git a/cdc/scheduler/internal/v3/keyspan/reconciler.go b/cdc/scheduler/internal/v3/keyspan/reconciler.go index 88187debd71..3dc115bef23 100644 --- a/cdc/scheduler/internal/v3/keyspan/reconciler.go +++ b/cdc/scheduler/internal/v3/keyspan/reconciler.go @@ -30,6 +30,8 @@ import ( "go.uber.org/zap" ) +const spanRegionLimit = 50000 + type splitter interface { split( ctx context.Context, span tablepb.Span, totalCaptures int, @@ -69,6 +71,7 @@ func NewReconciler( changefeedID: changefeedID, config: config, splitter: []splitter{ + // write splitter has the highest priority. newWriteSplitter(changefeedID, pdapi), newRegionCountSplitter(changefeedID, up.RegionCache), }, diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go b/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go index 86e4d0a3b50..74cd2ab760b 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go @@ -65,11 +65,18 @@ func (m *regionCountSplitter) split( return []tablepb.Span{span} } + pages := totalCaptures + totalRegions := len(regions) if totalRegions == 0 { - totalCaptures = 1 + pages = 1 + } + + if totalRegions/spanRegionLimit > pages { + pages = totalRegions / spanRegionLimit } - stepper := newEvenlySplitStepper(totalCaptures, totalRegions) + + stepper := newEvenlySplitStepper(pages, totalRegions) spans := make([]tablepb.Span, 0, stepper.SpanCount()) start, end := 0, stepper.Step() for { @@ -128,7 +135,8 @@ func (m *regionCountSplitter) split( zap.Int("spans", len(spans)), zap.Int("totalCaptures", totalCaptures), zap.Int("regionCount", len(regions)), - zap.Int("regionThreshold", config.RegionThreshold)) + zap.Int("regionThreshold", config.RegionThreshold), + zap.Int("spanRegionLimit", spanRegionLimit)) return spans } @@ -139,23 +147,25 @@ type evenlySplitStepper struct { remain int } -func newEvenlySplitStepper(totalCaptures int, totalRegion int) evenlySplitStepper { +func newEvenlySplitStepper(pages int, totalRegion int) evenlySplitStepper { extraRegionPerSpan := 0 - regionPerSpan, remain := totalRegion/totalCaptures, totalRegion%totalCaptures + regionPerSpan, remain := totalRegion/pages, totalRegion%pages if regionPerSpan == 0 { regionPerSpan = 1 extraRegionPerSpan = 0 - totalCaptures = totalRegion + pages = totalRegion } else if remain != 0 { // Evenly distributes the remaining regions. - extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(totalCaptures))) + extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(pages))) } - return evenlySplitStepper{ + res := evenlySplitStepper{ regionPerSpan: regionPerSpan, - spanCount: totalCaptures, + spanCount: pages, extraRegionPerSpan: extraRegionPerSpan, remain: remain, } + log.Info("schedulerv3: evenly split stepper", zap.Any("evenlySplitStepper", res)) + return res } func (e *evenlySplitStepper) SpanCount() int { diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write.go b/cdc/scheduler/internal/v3/keyspan/splitter_write.go index 4ef4d130040..fc458ef788c 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_write.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write.go @@ -25,6 +25,8 @@ import ( "go.uber.org/zap" ) +const regionWrittenKeyBase = 1 + type writeSplitter struct { changefeedID model.ChangeFeedID pdAPIClient pdutil.PDAPIClient @@ -56,15 +58,28 @@ func (m *writeSplitter) split( zap.Error(err)) return nil } - if totalCaptures <= 1 { - log.Warn("schedulerv3: only one capture, skip split span", + + pages := totalCaptures + if len(regions)/spanRegionLimit > pages { + pages = len(regions) / spanRegionLimit + } + + if pages <= 1 { + log.Warn("schedulerv3: only one capture and the regions number less than"+ + " the maxSpanRegionLimit, skip split span", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.String("span", span.String()), zap.Error(err)) return []tablepb.Span{span} } - info := splitRegionsByWrittenKeys(span.TableID, regions, config.WriteKeyThreshold, totalCaptures) + + info := splitRegionsByWrittenKeys(span.TableID, + regions, + config.WriteKeyThreshold, + pages, + spanRegionLimit) + log.Info("schedulerv3: split span by written keys", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -73,7 +88,8 @@ func (m *writeSplitter) split( zap.Ints("weights", info.Weights), zap.Int("spans", len(info.Spans)), zap.Int("totalCaptures", totalCaptures), - zap.Int("writeKeyThreshold", config.WriteKeyThreshold)) + zap.Int("writeKeyThreshold", config.WriteKeyThreshold), + zap.Int("spanRegionLimit", spanRegionLimit)) return info.Spans } @@ -84,8 +100,10 @@ type splitRegionsInfo struct { } // splitRegionsByWrittenKeys returns a slice of regions that evenly split the range by write keys. +// pages is the number of splits to make, actually it is the number of captures. func splitRegionsByWrittenKeys( - tableID model.TableID, regions []pdutil.RegionInfo, writeKeyThreshold int, pages int, + tableID model.TableID, regions []pdutil.RegionInfo, + writeKeyThreshold int, pages int, spanRegionLimit int, ) *splitRegionsInfo { decodeKey := func(hexkey string) []byte { key, _ := hex.DecodeString(hexkey) @@ -96,8 +114,8 @@ func splitRegionsByWrittenKeys( for i := range regions { totalWrite += regions[i].WrittenKeys // Override 0 to 1 to reflect the baseline cost of a region. - // Also it makes split evenly when there is no write. - regions[i].WrittenKeys++ + // Also, it makes split evenly when there is no write. + regions[i].WrittenKeys += regionWrittenKeyBase totalWriteNormalized += regions[i].WrittenKeys } if totalWrite < uint64(writeKeyThreshold) { @@ -118,15 +136,21 @@ func splitRegionsByWrittenKeys( spans := make([]tablepb.Span, 0, pages) accWrittenKeys, pageWrittenKeys := uint64(0), uint64(0) pageStartIdx, pageLastIdx := 0, 0 + pageRegionsCount := 0 + // split the table into pages-1 spans, each span has writtenKeysPerPage written keys. for i := 1; i < pages; i++ { for idx := pageStartIdx; idx < len(regions); idx++ { restPages := pages - i restRegions := len(regions) - idx pageLastIdx = idx currentWrittenKeys := regions[idx].WrittenKeys + // If there is at least one region, and the rest regions can't fill the rest pages or + // the accWrittenKeys plus currentWrittenKeys is larger than writtenKeysPerPage, + // then use the region from pageStartIdx to idx-1 to as a span and start a new page. if (idx > pageStartIdx) && ((restPages >= restRegions) || - (accWrittenKeys+currentWrittenKeys > writtenKeysPerPage)) { + (accWrittenKeys+currentWrittenKeys > writtenKeysPerPage) || + pageRegionsCount >= spanRegionLimit) { spans = append(spans, tablepb.Span{ TableID: tableID, StartKey: tablepb.Key(decodeKey(regions[pageStartIdx].StartKey)), @@ -136,15 +160,20 @@ func splitRegionsByWrittenKeys( weights = append(weights, int(pageWrittenKeys)) pageWrittenKeys = 0 pageStartIdx = idx + // update writtenKeysPerPage to make the rest regions evenly split + // to the rest pages. writtenKeysPerPage = (totalWriteNormalized - accWrittenKeys) / uint64(restPages) accWrittenKeys = 0 + pageRegionsCount = 0 break } pageWrittenKeys += currentWrittenKeys accWrittenKeys += currentWrittenKeys + pageRegionsCount++ } } - // Always end with the last region. + + // The last span contains the rest regions. spans = append(spans, tablepb.Span{ TableID: tableID, StartKey: tablepb.Key(decodeKey(regions[pageLastIdx].StartKey)), diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go index 791207f3c55..56590644435 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go @@ -59,14 +59,14 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { regions, startKeys, endKeys := prepareRegionsInfo( [7]int{100, 100, 100, 100, 100, 100, 100}) - info := splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 1) + info := splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 1, spanRegionLimit) re.Len(info.Counts, 1) re.EqualValues(7, info.Counts[0]) re.Len(info.Spans, 1) re.EqualValues(startKeys[2], info.Spans[0].StartKey) re.EqualValues(endKeys[8], info.Spans[0].EndKey) - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2) // [2,3,4], [5,6,7,8] + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2, spanRegionLimit) // [2,3,4], [5,6,7,8] re.Len(info.Counts, 2) re.EqualValues(3, info.Counts[0]) re.EqualValues(4, info.Counts[1]) @@ -79,7 +79,7 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { re.EqualValues(startKeys[5], info.Spans[1].StartKey) re.EqualValues(endKeys[8], info.Spans[1].EndKey) - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 3) // [2,3], [4,5,6], [7,8] + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 3, spanRegionLimit) // [2,3], [4,5,6], [7,8] re.Len(info.Counts, 3) re.EqualValues(2, info.Counts[0]) re.EqualValues(2, info.Counts[1]) @@ -98,7 +98,7 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { // Pages > regons for p := 7; p <= 10; p++ { - info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, p) + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, p, spanRegionLimit) re.Len(info.Counts, 7) for _, c := range info.Counts { re.EqualValues(1, c) @@ -113,6 +113,11 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { re.EqualValues(endKeys[2+i], r.EndKey) } } + + // test spanRegionLimit works + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2, 3) + re.Len(info.Counts, 2) + re.EqualValues(3, info.Counts[0]) } func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) { @@ -123,7 +128,7 @@ func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) { regions, startKeys, endKeys := prepareRegionsInfo( [7]int{100, 1, 100, 1, 1, 1, 100}) - info := splitRegionsByWrittenKeys(0, regions, 0, 4) // [2], [3,4], [5,6,7], [8] + info := splitRegionsByWrittenKeys(0, regions, 0, 4, spanRegionLimit) // [2], [3,4], [5,6,7], [8] re.Len(info.Counts, 4) re.EqualValues(1, info.Counts[0]) re.EqualValues(1, info.Counts[1]) @@ -153,7 +158,7 @@ func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) { regions, startKeys, endKeys := prepareRegionsInfo( [7]int{1000, 1, 1, 1, 100, 1, 99}) - info := splitRegionsByWrittenKeys(0, regions, 0, 4) // [2], [3,4,5], [6,7], [8] + info := splitRegionsByWrittenKeys(0, regions, 0, 4, spanRegionLimit) // [2], [3,4,5], [6,7], [8] re.Len(info.Spans, 4) re.EqualValues(startKeys[2], info.Spans[0].StartKey) re.EqualValues(endKeys[2], info.Spans[0].EndKey) @@ -170,7 +175,7 @@ func TestSplitRegionsByWrittenKeysCold(t *testing.T) { re := require.New(t) regions, startKeys, endKeys := prepareRegionsInfo([7]int{}) - info := splitRegionsByWrittenKeys(0, regions, 0, 3) // [2,3], [4,5], [6,7,8] + info := splitRegionsByWrittenKeys(0, regions, 0, 3, spanRegionLimit) // [2,3], [4,5], [6,7,8] re.Len(info.Counts, 3) re.EqualValues(2, info.Counts[0], info) re.EqualValues(2, info.Counts[1]) @@ -193,7 +198,7 @@ func TestSplitRegionsByWrittenKeysConfig(t *testing.T) { re := require.New(t) regions, startKeys, endKeys := prepareRegionsInfo([7]int{1, 1, 1, 1, 1, 1, 1}) - info := splitRegionsByWrittenKeys(1, regions, math.MaxInt, 3) // [2,3,4,5,6,7,8] + info := splitRegionsByWrittenKeys(1, regions, math.MaxInt, 3, spanRegionLimit) // [2,3,4,5,6,7,8] re.Len(info.Counts, 1) re.EqualValues(7, info.Counts[0], info) re.Len(info.Weights, 1)