Skip to content

Commit

Permalink
scheduler (ticdc): add region number limit for table span (#8930)
Browse files Browse the repository at this point in the history
close #8980
  • Loading branch information
asddongmen authored May 23, 2023
1 parent 899a96a commit 9cde63c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 39 deletions.
26 changes: 13 additions & 13 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (t *tableSpan) getAndUpdateTableSpanState() (tablepb.TableState, bool) {
log.Debug("schedulerv3: table state changed",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID),
zap.Any("tableSpan", t.span),
zap.Stringer("oldState", oldState),
zap.Stringer("state", t.state))
return t.state, true
Expand Down Expand Up @@ -112,7 +112,7 @@ func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message {
log.Warn("schedulerv3: remove table, but table is absent",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID))
zap.Any("tableSpan", t.span))
t.task = nil
return newRemoveTableResponseMessage(t.getTableSpanStatus(false))
case tablepb.TableStateStopping, // stopping now is useless
Expand Down Expand Up @@ -145,7 +145,7 @@ func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message {
log.Panic("schedulerv3: unknown table state",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state))
zap.Any("tableSpan", t.span), zap.Stringer("state", state))
}
}
return nil
Expand All @@ -164,7 +164,7 @@ func (t *tableSpan) handleAddTableTask(
log.Warn("schedulerv3: agent add table failed",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID), zap.Any("task", t.task),
zap.Any("tableSpan", t.span), zap.Any("task", t.task),
zap.Error(err))
status := t.getTableSpanStatus(false)
return newAddTableResponseMessage(status), errors.Trace(err)
Expand All @@ -174,7 +174,7 @@ func (t *tableSpan) handleAddTableTask(
log.Info("schedulerv3: table is replicating",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state))
zap.Any("tableSpan", t.span), zap.Stringer("state", state))
t.task = nil
status := t.getTableSpanStatus(false)
return newAddTableResponseMessage(status), nil
Expand All @@ -184,7 +184,7 @@ func (t *tableSpan) handleAddTableTask(
log.Info("schedulerv3: table is prepared",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state))
zap.Any("tableSpan", t.span), zap.Stringer("state", state))
t.task = nil
return newAddTableResponseMessage(t.getTableSpanStatus(false)), nil
}
Expand All @@ -195,7 +195,7 @@ func (t *tableSpan) handleAddTableTask(
log.Warn("schedulerv3: agent add table failed",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state),
zap.Any("tableSpan", t.span), zap.Stringer("state", state),
zap.Error(err))
status := t.getTableSpanStatus(false)
return newAddTableResponseMessage(status), errors.Trace(err)
Expand All @@ -219,20 +219,20 @@ func (t *tableSpan) handleAddTableTask(
log.Info("schedulerv3: add table finished",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state))
zap.Any("tableSpan", t.span), zap.Stringer("state", state))
case tablepb.TableStateStopping,
tablepb.TableStateStopped:
log.Warn("schedulerv3: ignore add table",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID))
zap.Any("tableSpan", t.span))
t.task = nil
return newAddTableResponseMessage(t.getTableSpanStatus(false)), nil
default:
log.Panic("schedulerv3: unknown table state",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID))
zap.Any("tableSpan", t.span))
}
}

Expand All @@ -244,14 +244,14 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) {
log.Panic("schedulerv3: tableID not match",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID),
zap.Any("tableSpan", t.span),
zap.Stringer("task.TableID", &task.Span))
}
if t.task == nil {
log.Info("schedulerv3: table found new task",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID),
zap.Any("tableSpan", t.span),
zap.Any("task", task))
t.task = task
return
Expand All @@ -260,7 +260,7 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) {
"since there is one not finished yet",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.span.TableID),
zap.Any("tableSpan", t.span),
zap.Any("nowTask", t.task),
zap.Any("ignoredTask", task))
}
Expand Down
3 changes: 3 additions & 0 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"go.uber.org/zap"
)

const spanRegionLimit = 50000

type splitter interface {
split(
ctx context.Context, span tablepb.Span, totalCaptures int,
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewReconciler(
changefeedID: changefeedID,
config: config,
splitter: []splitter{
// write splitter has the highest priority.
newWriteSplitter(changefeedID, pdapi),
newRegionCountSplitter(changefeedID, up.RegionCache),
},
Expand Down
28 changes: 19 additions & 9 deletions cdc/scheduler/internal/v3/keyspan/splitter_region_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,18 @@ func (m *regionCountSplitter) split(
return []tablepb.Span{span}
}

pages := totalCaptures

totalRegions := len(regions)
if totalRegions == 0 {
totalCaptures = 1
pages = 1
}

if totalRegions/spanRegionLimit > pages {
pages = totalRegions / spanRegionLimit
}
stepper := newEvenlySplitStepper(totalCaptures, totalRegions)

stepper := newEvenlySplitStepper(pages, totalRegions)
spans := make([]tablepb.Span, 0, stepper.SpanCount())
start, end := 0, stepper.Step()
for {
Expand Down Expand Up @@ -128,7 +135,8 @@ func (m *regionCountSplitter) split(
zap.Int("spans", len(spans)),
zap.Int("totalCaptures", totalCaptures),
zap.Int("regionCount", len(regions)),
zap.Int("regionThreshold", config.RegionThreshold))
zap.Int("regionThreshold", config.RegionThreshold),
zap.Int("spanRegionLimit", spanRegionLimit))
return spans
}

Expand All @@ -139,23 +147,25 @@ type evenlySplitStepper struct {
remain int
}

func newEvenlySplitStepper(totalCaptures int, totalRegion int) evenlySplitStepper {
func newEvenlySplitStepper(pages int, totalRegion int) evenlySplitStepper {
extraRegionPerSpan := 0
regionPerSpan, remain := totalRegion/totalCaptures, totalRegion%totalCaptures
regionPerSpan, remain := totalRegion/pages, totalRegion%pages
if regionPerSpan == 0 {
regionPerSpan = 1
extraRegionPerSpan = 0
totalCaptures = totalRegion
pages = totalRegion
} else if remain != 0 {
// Evenly distributes the remaining regions.
extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(totalCaptures)))
extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(pages)))
}
return evenlySplitStepper{
res := evenlySplitStepper{
regionPerSpan: regionPerSpan,
spanCount: totalCaptures,
spanCount: pages,
extraRegionPerSpan: extraRegionPerSpan,
remain: remain,
}
log.Info("schedulerv3: evenly split stepper", zap.Any("evenlySplitStepper", res))
return res
}

func (e *evenlySplitStepper) SpanCount() int {
Expand Down
47 changes: 38 additions & 9 deletions cdc/scheduler/internal/v3/keyspan/splitter_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"go.uber.org/zap"
)

const regionWrittenKeyBase = 1

type writeSplitter struct {
changefeedID model.ChangeFeedID
pdAPIClient pdutil.PDAPIClient
Expand Down Expand Up @@ -56,15 +58,28 @@ func (m *writeSplitter) split(
zap.Error(err))
return nil
}
if totalCaptures <= 1 {
log.Warn("schedulerv3: only one capture, skip split span",

pages := totalCaptures
if len(regions)/spanRegionLimit > pages {
pages = len(regions) / spanRegionLimit
}

if pages <= 1 {
log.Warn("schedulerv3: only one capture and the regions number less than"+
" the maxSpanRegionLimit, skip split span",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("span", span.String()),
zap.Error(err))
return []tablepb.Span{span}
}
info := splitRegionsByWrittenKeys(span.TableID, regions, config.WriteKeyThreshold, totalCaptures)

info := splitRegionsByWrittenKeys(span.TableID,
regions,
config.WriteKeyThreshold,
pages,
spanRegionLimit)

log.Info("schedulerv3: split span by written keys",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand All @@ -73,7 +88,8 @@ func (m *writeSplitter) split(
zap.Ints("weights", info.Weights),
zap.Int("spans", len(info.Spans)),
zap.Int("totalCaptures", totalCaptures),
zap.Int("writeKeyThreshold", config.WriteKeyThreshold))
zap.Int("writeKeyThreshold", config.WriteKeyThreshold),
zap.Int("spanRegionLimit", spanRegionLimit))
return info.Spans
}

Expand All @@ -84,8 +100,10 @@ type splitRegionsInfo struct {
}

// splitRegionsByWrittenKeys returns a slice of regions that evenly split the range by write keys.
// pages is the number of splits to make, actually it is the number of captures.
func splitRegionsByWrittenKeys(
tableID model.TableID, regions []pdutil.RegionInfo, writeKeyThreshold int, pages int,
tableID model.TableID, regions []pdutil.RegionInfo,
writeKeyThreshold int, pages int, spanRegionLimit int,
) *splitRegionsInfo {
decodeKey := func(hexkey string) []byte {
key, _ := hex.DecodeString(hexkey)
Expand All @@ -96,8 +114,8 @@ func splitRegionsByWrittenKeys(
for i := range regions {
totalWrite += regions[i].WrittenKeys
// Override 0 to 1 to reflect the baseline cost of a region.
// Also it makes split evenly when there is no write.
regions[i].WrittenKeys++
// Also, it makes split evenly when there is no write.
regions[i].WrittenKeys += regionWrittenKeyBase
totalWriteNormalized += regions[i].WrittenKeys
}
if totalWrite < uint64(writeKeyThreshold) {
Expand All @@ -118,15 +136,21 @@ func splitRegionsByWrittenKeys(
spans := make([]tablepb.Span, 0, pages)
accWrittenKeys, pageWrittenKeys := uint64(0), uint64(0)
pageStartIdx, pageLastIdx := 0, 0
pageRegionsCount := 0
// split the table into pages-1 spans, each span has writtenKeysPerPage written keys.
for i := 1; i < pages; i++ {
for idx := pageStartIdx; idx < len(regions); idx++ {
restPages := pages - i
restRegions := len(regions) - idx
pageLastIdx = idx
currentWrittenKeys := regions[idx].WrittenKeys
// If there is at least one region, and the rest regions can't fill the rest pages or
// the accWrittenKeys plus currentWrittenKeys is larger than writtenKeysPerPage,
// then use the region from pageStartIdx to idx-1 to as a span and start a new page.
if (idx > pageStartIdx) &&
((restPages >= restRegions) ||
(accWrittenKeys+currentWrittenKeys > writtenKeysPerPage)) {
(accWrittenKeys+currentWrittenKeys > writtenKeysPerPage) ||
pageRegionsCount >= spanRegionLimit) {
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[pageStartIdx].StartKey)),
Expand All @@ -136,15 +160,20 @@ func splitRegionsByWrittenKeys(
weights = append(weights, int(pageWrittenKeys))
pageWrittenKeys = 0
pageStartIdx = idx
// update writtenKeysPerPage to make the rest regions evenly split
// to the rest pages.
writtenKeysPerPage = (totalWriteNormalized - accWrittenKeys) / uint64(restPages)
accWrittenKeys = 0
pageRegionsCount = 0
break
}
pageWrittenKeys += currentWrittenKeys
accWrittenKeys += currentWrittenKeys
pageRegionsCount++
}
}
// Always end with the last region.

// The last span contains the rest regions.
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[pageLastIdx].StartKey)),
Expand Down
21 changes: 13 additions & 8 deletions cdc/scheduler/internal/v3/keyspan/splitter_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) {
regions, startKeys, endKeys := prepareRegionsInfo(
[7]int{100, 100, 100, 100, 100, 100, 100})

info := splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 1)
info := splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 1, spanRegionLimit)
re.Len(info.Counts, 1)
re.EqualValues(7, info.Counts[0])
re.Len(info.Spans, 1)
re.EqualValues(startKeys[2], info.Spans[0].StartKey)
re.EqualValues(endKeys[8], info.Spans[0].EndKey)

info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2) // [2,3,4], [5,6,7,8]
info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2, spanRegionLimit) // [2,3,4], [5,6,7,8]
re.Len(info.Counts, 2)
re.EqualValues(3, info.Counts[0])
re.EqualValues(4, info.Counts[1])
Expand All @@ -79,7 +79,7 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) {
re.EqualValues(startKeys[5], info.Spans[1].StartKey)
re.EqualValues(endKeys[8], info.Spans[1].EndKey)

info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 3) // [2,3], [4,5,6], [7,8]
info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 3, spanRegionLimit) // [2,3], [4,5,6], [7,8]
re.Len(info.Counts, 3)
re.EqualValues(2, info.Counts[0])
re.EqualValues(2, info.Counts[1])
Expand All @@ -98,7 +98,7 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) {

// Pages > regons
for p := 7; p <= 10; p++ {
info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, p)
info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, p, spanRegionLimit)
re.Len(info.Counts, 7)
for _, c := range info.Counts {
re.EqualValues(1, c)
Expand All @@ -113,6 +113,11 @@ func TestSplitRegionsByWrittenKeysUniform(t *testing.T) {
re.EqualValues(endKeys[2+i], r.EndKey)
}
}

// test spanRegionLimit works
info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2, 3)
re.Len(info.Counts, 2)
re.EqualValues(3, info.Counts[0])
}

func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) {
Expand All @@ -123,7 +128,7 @@ func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) {
regions, startKeys, endKeys := prepareRegionsInfo(
[7]int{100, 1, 100, 1, 1, 1, 100})

info := splitRegionsByWrittenKeys(0, regions, 0, 4) // [2], [3,4], [5,6,7], [8]
info := splitRegionsByWrittenKeys(0, regions, 0, 4, spanRegionLimit) // [2], [3,4], [5,6,7], [8]
re.Len(info.Counts, 4)
re.EqualValues(1, info.Counts[0])
re.EqualValues(1, info.Counts[1])
Expand Down Expand Up @@ -153,7 +158,7 @@ func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) {
regions, startKeys, endKeys := prepareRegionsInfo(
[7]int{1000, 1, 1, 1, 100, 1, 99})

info := splitRegionsByWrittenKeys(0, regions, 0, 4) // [2], [3,4,5], [6,7], [8]
info := splitRegionsByWrittenKeys(0, regions, 0, 4, spanRegionLimit) // [2], [3,4,5], [6,7], [8]
re.Len(info.Spans, 4)
re.EqualValues(startKeys[2], info.Spans[0].StartKey)
re.EqualValues(endKeys[2], info.Spans[0].EndKey)
Expand All @@ -170,7 +175,7 @@ func TestSplitRegionsByWrittenKeysCold(t *testing.T) {
re := require.New(t)

regions, startKeys, endKeys := prepareRegionsInfo([7]int{})
info := splitRegionsByWrittenKeys(0, regions, 0, 3) // [2,3], [4,5], [6,7,8]
info := splitRegionsByWrittenKeys(0, regions, 0, 3, spanRegionLimit) // [2,3], [4,5], [6,7,8]
re.Len(info.Counts, 3)
re.EqualValues(2, info.Counts[0], info)
re.EqualValues(2, info.Counts[1])
Expand All @@ -193,7 +198,7 @@ func TestSplitRegionsByWrittenKeysConfig(t *testing.T) {
re := require.New(t)

regions, startKeys, endKeys := prepareRegionsInfo([7]int{1, 1, 1, 1, 1, 1, 1})
info := splitRegionsByWrittenKeys(1, regions, math.MaxInt, 3) // [2,3,4,5,6,7,8]
info := splitRegionsByWrittenKeys(1, regions, math.MaxInt, 3, spanRegionLimit) // [2,3,4,5,6,7,8]
re.Len(info.Counts, 1)
re.EqualValues(7, info.Counts[0], info)
re.Len(info.Weights, 1)
Expand Down

0 comments on commit 9cde63c

Please sign in to comment.