diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 7f1ebdc6903..4c8694300a0 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -878,10 +878,15 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // After this resolved ts event is sent, we don't need to send one more // resolved ts event when the region starts to work. resolvedEv := model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: sri.span, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{ + { + Span: sri.span, + Region: sri.verID.GetID(), + }, + }, ResolvedTs: sri.ts, - }}, + }, } select { case s.eventCh <- resolvedEv: diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index f58030e177d..3185a10dbad 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -351,7 +351,7 @@ func TestConnectOfflineTiKV(t *testing.T) { } checkEvent := func(event model.RegionFeedEvent, ts uint64) { - require.Equal(t, ts, event.Resolved[0].ResolvedTs) + require.Equal(t, ts, event.Resolved.ResolvedTs) } initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) @@ -605,7 +605,7 @@ consumePreResolvedTs: select { case event = <-eventCh: require.NotNil(t, event.Resolved) - require.Equal(t, uint64(100), event.Resolved[0].ResolvedTs) + require.Equal(t, uint64(100), event.Resolved.ResolvedTs) case <-time.After(time.Second): break consumePreResolvedTs } @@ -641,7 +641,7 @@ consumePreResolvedTs: require.FailNow(t, "reconnection not succeed in 3 seconds") } require.NotNil(t, event.Resolved) - require.Equal(t, uint64(120), event.Resolved[0].ResolvedTs) + require.Equal(t, uint64(120), event.Resolved.ResolvedTs) cancel() } @@ -1080,10 +1080,12 @@ func testHandleFeedEvent(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 100, + }, }, { Val: &model.RawKVEntry{ @@ -1151,25 +1153,32 @@ func testHandleFeedEvent(t *testing.T) { RegionID: 3, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 135, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 135, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 145, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 145, + }, }, } multipleExpected := model.RegionFeedEvent{ - Resolved: make([]*model.ResolvedSpan, multiSize), - } - for i := range multipleExpected.Resolved { - multipleExpected.Resolved[i] = &model.ResolvedSpan{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Resolved: &model.ResolvedSpans{ + Spans: make([]model.RegionComparableSpan, multiSize), ResolvedTs: 160, + }, + } + for i := range multipleExpected.Resolved.Spans { + multipleExpected.Resolved.Spans[i] = model.RegionComparableSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, } } @@ -1327,7 +1336,7 @@ func TestStreamSendWithError(t *testing.T) { select { case event := <-eventCh: require.NotNil(t, event.Resolved) - require.Equal(t, 1, len(event.Resolved)) + require.Equal(t, 1, len(event.Resolved.Spans)) require.NotNil(t, 0, event.RegionID) case <-time.After(time.Second): require.Fail(t, fmt.Sprintf("expected events are not receive, received: %v", initRegions)) @@ -1431,16 +1440,20 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 120, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 120, + }, }, } @@ -1449,7 +1462,7 @@ eventLoop: for { select { case ev := <-eventCh: - if ev.Resolved[0].ResolvedTs != uint64(100) { + if ev.Resolved.ResolvedTs != uint64(100) { events = append(events, ev) } case <-time.After(time.Second): @@ -1617,7 +1630,7 @@ ReceiveLoop: break ReceiveLoop } received = append(received, event) - if event.Resolved[0].ResolvedTs == 130 { + if event.Resolved.ResolvedTs == 130 { break ReceiveLoop } case <-time.After(time.Second): @@ -1626,7 +1639,7 @@ ReceiveLoop: } var lastResolvedTs uint64 for _, e := range received { - if lastResolvedTs > e.Resolved[0].ResolvedTs { + if lastResolvedTs > e.Resolved.ResolvedTs { require.Fail(t, fmt.Sprintf("the resolvedTs is back off %#v", resolved)) } } @@ -1753,7 +1766,7 @@ func TestIncompatibleTiKV(t *testing.T) { ch1 <- initialized select { case event := <-eventCh: - require.Equal(t, 1, len(event.Resolved)) + require.Equal(t, 1, len(event.Resolved.Spans)) case <-time.After(time.Second): require.Fail(t, "expected events are not receive") } @@ -1823,7 +1836,7 @@ func TestNoPendingRegionError(t *testing.T) { ch1 <- initialized ev := <-eventCh require.NotNil(t, ev.Resolved) - require.Equal(t, uint64(100), ev.Resolved[0].ResolvedTs) + require.Equal(t, uint64(100), ev.Resolved.ResolvedTs) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -1835,7 +1848,7 @@ func TestNoPendingRegionError(t *testing.T) { ch1 <- resolved ev = <-eventCh require.NotNil(t, ev.Resolved) - require.Equal(t, uint64(200), ev.Resolved[0].ResolvedTs) + require.Equal(t, uint64(200), ev.Resolved.ResolvedTs) cancel() } @@ -1910,22 +1923,28 @@ func TestDropStaleRequest(t *testing.T) { }} expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 100, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 120, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 130, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 130, + }, }, } @@ -2010,16 +2029,20 @@ func TestResolveLock(t *testing.T) { }} expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 100, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: tso, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: tso, + }, }, } ch1 <- resolved @@ -2342,17 +2365,21 @@ func testEventAfterFeedStop(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 100, + }, RegionID: regionID, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 100, + }, RegionID: regionID, }, { @@ -2367,10 +2394,12 @@ func testEventAfterFeedStop(t *testing.T) { RegionID: 3, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, + }}, ResolvedTs: 120, + }, RegionID: regionID, }, } @@ -2542,10 +2571,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 100, + }, }, { Val: &model.RawKVEntry{ @@ -2570,10 +2601,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) { RegionID: 3, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 145, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 145, + }, }, } @@ -3025,17 +3058,19 @@ func testKVClientForceReconnect(t *testing.T) { ch2 <- resolved expected := model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - ResolvedTs: 135, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + Region: regionID3, + }}, ResolvedTs: 135, + }, } eventLoop: for { select { case ev := <-eventCh: - if ev.Resolved != nil && ev.Resolved[0].ResolvedTs == uint64(100) { + if ev.Resolved != nil && ev.Resolved.ResolvedTs == uint64(100) { continue } require.Equal(t, expected, ev) @@ -3263,10 +3298,12 @@ func TestEvTimeUpdate(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 100, + }, }, { Val: &model.RawKVEntry{ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index f70e43ec282..73db668b318 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -785,7 +785,7 @@ func (w *regionWorker) handleResolvedTs( revents *resolvedTsEvent, ) error { resolvedTs := revents.resolvedTs - resolvedSpans := make([]*model.ResolvedSpan, 0, len(revents.regions)) + resolvedSpans := make([]model.RegionComparableSpan, 0, len(revents.regions)) regions := make([]uint64, 0, len(revents.regions)) for _, state := range revents.regions { @@ -805,9 +805,9 @@ func (w *regionWorker) handleResolvedTs( continue } // emit a checkpointTs - resolvedSpans = append(resolvedSpans, &model.ResolvedSpan{ - Span: state.sri.span, - ResolvedTs: resolvedTs, + resolvedSpans = append(resolvedSpans, model.RegionComparableSpan{ + Span: state.sri.span, + Region: regionID, }) } if len(resolvedSpans) == 0 { @@ -828,7 +828,7 @@ func (w *regionWorker) handleResolvedTs( state.lock.Unlock() } // emit a checkpointTs - revent := model.RegionFeedEvent{Resolved: resolvedSpans} + revent := model.RegionFeedEvent{Resolved: &model.ResolvedSpans{ResolvedTs: resolvedTs, Spans: resolvedSpans}} select { case w.outputCh <- revent: w.metrics.metricSendEventResolvedCounter.Add(float64(len(resolvedSpans))) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 00919cf6fb9..566cad317f9 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -38,7 +38,7 @@ const ( //msgp:ignore RegionFeedEvent type RegionFeedEvent struct { Val *RawKVEntry - Resolved []*ResolvedSpan + Resolved *ResolvedSpans // Additional debug info, not used RegionID uint64 @@ -55,18 +55,26 @@ func (e *RegionFeedEvent) GetValue() interface{} { } } -// ResolvedSpan guarantees all the KV value event +// ResolvedSpans guarantees all the KV value event // with commit ts less than ResolvedTs has been emitted. // -//msgp:ignore ResolvedSpan -type ResolvedSpan struct { - Span regionspan.ComparableSpan +//msgp:ignore ResolvedSpans +type ResolvedSpans struct { + Spans []RegionComparableSpan ResolvedTs uint64 } // String implements fmt.Stringer interface. -func (rs *ResolvedSpan) String() string { - return fmt.Sprintf("span: %s, resolved-ts: %d", rs.Span, rs.ResolvedTs) +func (rs *ResolvedSpans) String() string { + return fmt.Sprintf("span: %v, resolved-ts: %d", rs.Spans, rs.ResolvedTs) +} + +// RegionComparableSpan contains a comparable span and a region id of that span +// +//msgp:ignore RegionComparableSpan +type RegionComparableSpan struct { + Span regionspan.ComparableSpan + Region uint64 } // RawKVEntry notify the KV operator diff --git a/cdc/model/kv_test.go b/cdc/model/kv_test.go index bef23dd13e5..567247f6ecc 100644 --- a/cdc/model/kv_test.go +++ b/cdc/model/kv_test.go @@ -27,9 +27,10 @@ func TestRegionFeedEvent(t *testing.T) { CRTs: 1, OpType: OpTypePut, } - resolved := &ResolvedSpan{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 111, + resolved := &ResolvedSpans{ + Spans: []RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 111, } ev := &RegionFeedEvent{} @@ -38,10 +39,10 @@ func TestRegionFeedEvent(t *testing.T) { ev = &RegionFeedEvent{Val: raw} require.Equal(t, raw, ev.GetValue()) - ev = &RegionFeedEvent{Resolved: []*ResolvedSpan{resolved}} - require.Equal(t, resolved, ev.GetValue().([]*ResolvedSpan)[0]) + ev = &RegionFeedEvent{Resolved: resolved} + require.Equal(t, resolved, ev.GetValue().(*ResolvedSpans)) - require.Equal(t, "span: [61, 62), resolved-ts: 111", resolved.String()) + require.Equal(t, "span: [{[61, 62) 0}], resolved-ts: 111", resolved.String()) } func TestRawKVEntry(t *testing.T) { diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 08d51266397..33ef73d9444 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -20,11 +20,16 @@ import ( "strings" "github.com/pingcap/tiflow/pkg/regionspan" + "github.com/prometheus/client_golang/prometheus" ) +// fakeRegionID when the frontier is initializing, there is no region ID +// use fakeRegionID ,so this span will be cached +const fakeRegionID = 0 + // Frontier checks resolved event of spans and moves the global resolved ts ahead type Frontier interface { - Forward(span regionspan.ComparableSpan, ts uint64) + Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) Frontier() uint64 String() string } @@ -33,15 +38,27 @@ type Frontier interface { type spanFrontier struct { spanList skipList minTsHeap fibonacciHeap + + seekTempResult []*skipListNode + + cachedRegions map[uint64]*skipListNode + metricResolvedRegionMissedCounter prometheus.Counter } // NewFrontier creates Frontier from the given spans. // spanFrontier don't support use Nil as the maximum key of End range // So we use set it as util.UpperBoundKey, the means the real use case *should not* have an // End key bigger than util.UpperBoundKey -func NewFrontier(checkpointTs uint64, spans ...regionspan.ComparableSpan) Frontier { +func NewFrontier(checkpointTs uint64, + metricResolvedRegionMissedCounter prometheus.Counter, + spans ...regionspan.ComparableSpan, +) Frontier { s := &spanFrontier{ - spanList: *newSpanList(), + spanList: *newSpanList(), + seekTempResult: make(seekResult, maxHeight), + cachedRegions: map[uint64]*skipListNode{}, + + metricResolvedRegionMissedCounter: metricResolvedRegionMissedCounter, } firstSpan := true for _, span := range spans { @@ -51,7 +68,7 @@ func NewFrontier(checkpointTs uint64, spans ...regionspan.ComparableSpan) Fronti firstSpan = false continue } - s.insert(span, checkpointTs) + s.insert(0, span, checkpointTs) } return s @@ -63,33 +80,50 @@ func (s *spanFrontier) Frontier() uint64 { } // Forward advances the timestamp for a span. -func (s *spanFrontier) Forward(span regionspan.ComparableSpan, ts uint64) { - s.insert(span, ts) +func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { + // it's the fast part to detect if the region is split or merged, + // if not we can update the minTsHeap with use new ts directly + if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil { + if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { + s.minTsHeap.UpdateKey(n.Value(), ts) + return + } + } + s.metricResolvedRegionMissedCounter.Inc() + s.insert(regionID, span, ts) } -func (s *spanFrontier) insert(span regionspan.ComparableSpan, ts uint64) { - seekRes := s.spanList.Seek(span.Start) - +func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, ts uint64) { + // clear the seek result + for i := 0; i < len(s.seekTempResult); i++ { + s.seekTempResult[i] = nil + } + seekRes := s.spanList.Seek(span.Start, s.seekTempResult) // if there is no change in the region span // We just need to update the ts corresponding to the span in list next := seekRes.Node().Next() if next != nil { - cmpStart := bytes.Compare(seekRes.Node().Key(), span.Start) - cmpEnd := bytes.Compare(next.Key(), span.End) - if cmpStart == 0 && cmpEnd == 0 { + if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) + if regionID != fakeRegionID { + s.cachedRegions[regionID] = seekRes.Node() + s.cachedRegions[regionID].regionID = regionID + s.cachedRegions[regionID].end = next.key + } return } } // regions are merged or split, overwrite span into list node := seekRes.Node() + delete(s.cachedRegions, node.regionID) lastNodeTs := uint64(math.MaxUint64) shouldInsertStartNode := true if node.Value() != nil { lastNodeTs = node.Value().key } for ; node != nil; node = node.Next() { + delete(s.cachedRegions, node.regionID) cmpStart := bytes.Compare(node.Key(), span.Start) if cmpStart < 0 { continue diff --git a/cdc/puller/frontier/frontier_bench_test.go b/cdc/puller/frontier/frontier_bench_test.go index 158122129b1..4aa69965d02 100644 --- a/cdc/puller/frontier/frontier_bench_test.go +++ b/cdc/puller/frontier/frontier_bench_test.go @@ -49,12 +49,12 @@ func BenchmarkSpanFrontier(b *testing.B) { spans = append(spans, span) } - f := NewFrontier(0, spans...) + f := NewFrontier(0, c, spans...) b.ResetTimer() for i := 0; i < b.N; i++ { - f.Forward(spans[i%n], uint64(i)) + f.Forward(0, spans[i%n], uint64(i)) } }) } @@ -91,12 +91,12 @@ func BenchmarkSpanFrontierOverlap(b *testing.B) { }) } - f := NewFrontier(0, spans...) + f := NewFrontier(0, c, spans...) b.ResetTimer() for i := 0; i < b.N; i++ { - f.Forward(forward[i%n], uint64(i)) + f.Forward(0, forward[i%n], uint64(i)) } }) } diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 6d753db2b67..70f8903915b 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/pingcap/tiflow/pkg/regionspan" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -37,14 +38,14 @@ func TestSpanFrontier(t *testing.T) { spBD := regionspan.ComparableSpan{Start: keyB, End: keyD} spCD := regionspan.ComparableSpan{Start: keyC, End: keyD} - f := NewFrontier(5, spAD).(*spanFrontier) + f := NewFrontier(5, c, spAD).(*spanFrontier) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[a @ 5] [d @ Max] `, f.String()) checkFrontier(t, f) f.Forward( - regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, + 0, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 100, ) require.Equal(t, uint64(5), f.Frontier()) @@ -52,7 +53,7 @@ func TestSpanFrontier(t *testing.T) { checkFrontier(t, f) f.Forward( - regionspan.ComparableSpan{Start: []byte("g"), End: []byte("h")}, + 0, regionspan.ComparableSpan{Start: []byte("g"), End: []byte("h")}, 200, ) require.Equal(t, uint64(5), f.Frontier()) @@ -61,7 +62,7 @@ func TestSpanFrontier(t *testing.T) { // Forward the tracked span space. f.Forward( - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, + 0, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 1, ) require.Equal(t, uint64(1), f.Frontier()) @@ -70,7 +71,7 @@ func TestSpanFrontier(t *testing.T) { // // Forward it again f.Forward( - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, + 0, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 2, ) require.Equal(t, uint64(2), f.Frontier()) @@ -79,7 +80,7 @@ func TestSpanFrontier(t *testing.T) { // // Forward to smaller ts f.Forward( - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, + 0, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 1, ) require.Equal(t, uint64(1), f.Frontier()) @@ -87,70 +88,70 @@ func TestSpanFrontier(t *testing.T) { checkFrontier(t, f) // // Forward b-c - f.Forward(spBC, 3) + f.Forward(0, spBC, 3) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ 3] [c @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward b-c more to be 4 - f.Forward(spBC, 4) + f.Forward(0, spBC, 4) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ 4] [c @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward all to at least 3 - f.Forward(spAD, 3) + f.Forward(0, spAD, 3) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, `[a @ 3] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward AB and CD to be 5, keep BC at 4 - f.Forward(spAB, 5) + f.Forward(0, spAB, 5) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, `[a @ 5] [b @ 3] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(spCD, 5) + f.Forward(0, spCD, 5) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, `[a @ 5] [b @ 3] [c @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Catch BC to be 5 too - f.Forward(spBC, 5) + f.Forward(0, spBC, 5) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[a @ 5] [b @ 5] [c @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward all to be 6 - f.Forward(spAD, 6) + f.Forward(0, spAD, 6) require.Equal(t, uint64(6), f.Frontier()) require.Equal(t, `[a @ 6] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward ac to 7 - f.Forward(spAC, 7) + f.Forward(0, spAC, 7) require.Equal(t, uint64(6), f.Frontier()) require.Equal(t, `[a @ 7] [c @ 6] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward bd to 8 - f.Forward(spBD, 8) + f.Forward(0, spBD, 8) require.Equal(t, uint64(7), f.Frontier()) require.Equal(t, `[a @ 7] [b @ 8] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward ab to 8 - f.Forward(spAB, 8) + f.Forward(0, spAB, 8) require.Equal(t, uint64(8), f.Frontier()) require.Equal(t, `[a @ 8] [b @ 8] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(regionspan.ComparableSpan{Start: []byte("1"), End: []byte("g")}, 9) + f.Forward(0, regionspan.ComparableSpan{Start: []byte("1"), End: []byte("g")}, 9) require.Equal(t, uint64(9), f.Frontier()) require.Equal(t, `[1 @ 9] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(regionspan.ComparableSpan{Start: []byte("g"), End: []byte("i")}, 10) + f.Forward(0, regionspan.ComparableSpan{Start: []byte("g"), End: []byte("i")}, 10) require.Equal(t, uint64(9), f.Frontier()) require.Equal(t, `[1 @ 9] [g @ 10] [i @ Max] `, f.String()) checkFrontier(t, f) @@ -169,10 +170,10 @@ func TestSpanFrontierFallback(t *testing.T) { spCD := regionspan.ComparableSpan{Start: keyC, End: keyD} spDE := regionspan.ComparableSpan{Start: keyD, End: keyE} - f := NewFrontier(20, spAB).(*spanFrontier) - f.Forward(spBC, 20) - f.Forward(spCD, 10) - f.Forward(spDE, 20) + f := NewFrontier(20, c, spAB).(*spanFrontier) + f.Forward(0, spBC, 20) + f.Forward(0, spCD, 10) + f.Forward(0, spDE, 20) // [A, B) [B, C) [C, D) [D, E) // 20 20 10 20 @@ -187,7 +188,7 @@ func TestSpanFrontierFallback(t *testing.T) { // [A, B) [B, C) [C, D) [D, E) // 20 10 10 20 // [B, C) does not forward, because of merge into [A, C) immediately - f.Forward(spCD, 20) + f.Forward(0, spCD, 20) require.Equal(t, uint64(20), f.Frontier()) // the frontier stoes [A, B) and [B, C) but they are not correct exactly require.Equal(t, `[a @ 20] [b @ 20] [c @ 20] [d @ 20] [e @ Max] `, f.String()) @@ -211,27 +212,27 @@ func TestMinMax(t *testing.T) { spMinMax := regionspan.ComparableSpan{Start: keyMin, End: keyMax} spMinMax = spMinMax.Hack() - f := NewFrontier(0, spMinMax) + f := NewFrontier(0, c, spMinMax) require.Equal(t, uint64(0), f.Frontier()) require.Equal(t, "[ @ 0] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMinMax, 1) + f.Forward(0, spMinMax, 1) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, "[ @ 1] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMinMid, 2) + f.Forward(0, spMinMid, 2) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, "[ @ 2] [m @ 1] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMidMax, 2) + f.Forward(0, spMidMax, 2) require.Equal(t, uint64(2), f.Frontier()) require.Equal(t, "[ @ 2] [m @ 2] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMinMax, 3) + f.Forward(0, spMinMax, 3) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, "[ @ 3] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) @@ -256,58 +257,60 @@ func TestSpanFrontierDisjoinSpans(t *testing.T) { sp12 := regionspan.ComparableSpan{Start: key1, End: key2} sp1F := regionspan.ComparableSpan{Start: key1, End: keyF} - f := NewFrontier(0, spAB, spCE) + f := NewFrontier(0, c, spAB, spCE) require.Equal(t, uint64(0), f.Frontier()) require.Equal(t, `[a @ 0] [b @ Max] [c @ 0] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance the tracked spans - f.Forward(spAB, 1) + f.Forward(0, spAB, 1) require.Equal(t, uint64(0), f.Frontier()) require.Equal(t, `[a @ 1] [b @ Max] [c @ 0] [e @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(spCE, 1) + f.Forward(0, spCE, 1) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ Max] [c @ 1] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance d-e split c-e to c-d and d-e - f.Forward(spDE, 2) + f.Forward(0, spDE, 2) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ Max] [c @ 1] [d @ 2] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance a-d cover a-b and c-d - f.Forward(spAD, 3) + f.Forward(0, spAD, 3) require.Equal(t, uint64(2), f.Frontier()) require.Equal(t, `[a @ 3] [d @ 2] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance one cover all 3 span - f.Forward(spAE, 4) + f.Forward(0, spAE, 4) require.Equal(t, uint64(4), f.Frontier()) require.Equal(t, `[a @ 4] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance all with a larger span - f.Forward(sp1F, 5) + f.Forward(0, sp1F, 5) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[1 @ 5] [f @ Max] `, f.String()) checkFrontier(t, f) // Advance span smaller than all tracked spans - f.Forward(sp12, 6) + f.Forward(0, sp12, 6) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[1 @ 6] [2 @ 5] [f @ Max] `, f.String()) checkFrontier(t, f) } +var c = prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"}).WithLabelValues("a") + func TestSpanFrontierRandomly(t *testing.T) { t.Parallel() var keyMin []byte var keyMax []byte spMinMax := regionspan.ComparableSpan{Start: keyMin, End: keyMax} - f := NewFrontier(0, spMinMax) + f := NewFrontier(0, c, spMinMax) var spans []regionspan.ComparableSpan for len(spans) < 500000 { @@ -328,7 +331,7 @@ func TestSpanFrontierRandomly(t *testing.T) { ts := rand.Uint64() - f.Forward(span, ts) + f.Forward(0, span, ts) checkFrontier(t, f) } } @@ -350,3 +353,43 @@ func checkFrontier(t *testing.T, f Frontier) { require.Equal(t, tsInHeap, tsInList) require.Equal(t, tsInList[0], f.Frontier()) } + +func TestMinMaxWithRegionSplitMerge(t *testing.T) { + t.Parallel() + + ab := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")} + bc := regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")} + cd := regionspan.ComparableSpan{Start: []byte("c"), End: []byte("d")} + de := regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")} + ef := regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")} + af := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("f")} + + f := NewFrontier(0, c, af) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(1, ab, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(2, bc, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(3, cd, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(4, de, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(5, ef, 1) + require.Equal(t, uint64(1), f.Frontier()) + f.Forward(6, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 6) + require.Equal(t, uint64(1), f.Frontier()) + f.Forward(7, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("f")}, 2) + require.Equal(t, uint64(2), f.Frontier()) + f.Forward(7, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("f")}, 3) + require.Equal(t, uint64(3), f.Frontier()) + f.Forward(7, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("f")}, 4) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 4) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(9, regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")}, 4) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(9, regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")}, 7) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 5) + require.Equal(t, uint64(5), f.Frontier()) +} diff --git a/cdc/puller/frontier/list.go b/cdc/puller/frontier/list.go index 812c5778e1e..ce8c0713f99 100644 --- a/cdc/puller/frontier/list.go +++ b/cdc/puller/frontier/list.go @@ -31,6 +31,9 @@ type skipListNode struct { key []byte value *fibonacciHeapNode + end []byte + regionID uint64 + nexts []*skipListNode } @@ -39,6 +42,10 @@ func (s *skipListNode) Key() []byte { return s.key } +func (s *skipListNode) End() []byte { + return s.end +} + // Value is the value of the node func (s *skipListNode) Value() *fibonacciHeapNode { return s.value @@ -51,7 +58,7 @@ func (s *skipListNode) Next() *skipListNode { type seekResult []*skipListNode -// Next points to the next seek result +// Next points to the next seek seekTempResult func (s seekResult) Next() { next := s.Node().Next() for i := range next.nexts { @@ -59,7 +66,7 @@ func (s seekResult) Next() { } } -// Node returns the node point by the seek result +// Node returns the node point by the seek seekTempResult func (s seekResult) Node() *skipListNode { if len(s) == 0 { return nil @@ -89,13 +96,12 @@ func (l *skipList) randomHeight() int { //go:linkname fastrand runtime.fastrand func fastrand() uint32 -// Seek returns the seek result -// the seek result is a slice of nodes, +// Seek returns the seek seekTempResult +// the seek seekTempResult is a slice of nodes, // Each element in the slice represents the nearest(left) node to the target value at each level of the skip list. -func (l *skipList) Seek(key []byte) seekResult { +func (l *skipList) Seek(key []byte, result []*skipListNode) seekResult { head := &l.head current := head - result := make(seekResult, maxHeight) LevelLoop: for level := l.height - 1; level >= 0; level-- { @@ -123,7 +129,7 @@ LevelLoop: return result } -// InsertNextToNode insert the specified node after the seek result +// InsertNextToNode insert the specified node after the seek seekTempResult func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) { if seekR.Node() != nil && !nextTo(seekR.Node(), key) { log.Panic("the InsertNextToNode function can only append node to the seek result.") @@ -151,11 +157,11 @@ func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonac // Insert inserts the specified node func (l *skipList) Insert(key []byte, value *fibonacciHeapNode) { - seekR := l.Seek(key) + seekR := l.Seek(key, make(seekResult, maxHeight)) l.InsertNextToNode(seekR, key, value) } -// Remove removes the specified node after the seek result +// Remove removes the specified node after the seek seekTempResult func (l *skipList) Remove(seekR seekResult, toRemove *skipListNode) { seekCurrent := seekR.Node() if seekCurrent == nil || seekCurrent.Next() != toRemove { diff --git a/cdc/puller/frontier/list_test.go b/cdc/puller/frontier/list_test.go index e575ae2a725..39dfb996c5e 100644 --- a/cdc/puller/frontier/list_test.go +++ b/cdc/puller/frontier/list_test.go @@ -40,7 +40,7 @@ func TestInsertAndRemove(t *testing.T) { // check all the keys are exist in list for _, k := range keys { - a := list.Seek(k).Node().Key() + a := list.Seek(k, make(seekResult, maxHeight)).Node().Key() cmp := bytes.Compare(a, k) require.Equal(t, 0, cmp) } @@ -48,7 +48,7 @@ func TestInsertAndRemove(t *testing.T) { for i := 0; i < 10000; i++ { indexToRemove := rand.Intn(10000) - seekRes := list.Seek(keys[indexToRemove]) + seekRes := list.Seek(keys[indexToRemove], make(seekResult, maxHeight)) if seekRes.Node().Next() == nil { break } @@ -56,7 +56,7 @@ func TestInsertAndRemove(t *testing.T) { list.Remove(seekRes, seekRes.Node().Next()) // check the node is already removed - a := list.Seek(removedKey).Node().Key() + a := list.Seek(removedKey, make(seekResult, maxHeight)).Node().Key() cmp := bytes.Compare(a, removedKey) require.LessOrEqual(t, cmp, 0) } @@ -105,53 +105,53 @@ func TestSeek(t *testing.T) { list := newSpanList() - require.Nil(t, list.Seek(keyA).Node()) + require.Nil(t, list.Seek(keyA, make(seekResult, maxHeight)).Node()) // insert keyA to keyH insertIntoList(list, keyC, keyF, keyE, keyH, keyG, keyD, keyA, keyB) // Point to the first node, if seek key is smaller than the first key in list. - require.Nil(t, list.Seek(key1).Node().Key()) + require.Nil(t, list.Seek(key1, make(seekResult, maxHeight)).Node().Key()) // Point to the last node with key smaller than seek key. - require.Equal(t, keyH, list.Seek(keyH).Node().key) + require.Equal(t, keyH, list.Seek(keyH, make(seekResult, maxHeight)).Node().key) // Point to itself. - require.Equal(t, keyG, list.Seek(keyG).Node().key) + require.Equal(t, keyG, list.Seek(keyG, make(seekResult, maxHeight)).Node().key) // Ensure there is no problem to seek a larger key. - require.Equal(t, keyH, list.Seek(keyZ).Node().key) - - require.Equal(t, keyA, list.Seek([]byte("b0")).Node().key) - require.Equal(t, keyB, list.Seek([]byte("c0")).Node().key) - require.Equal(t, keyC, list.Seek([]byte("d0")).Node().key) - require.Equal(t, keyD, list.Seek([]byte("e0")).Node().key) - require.Equal(t, keyE, list.Seek([]byte("f0")).Node().key) - require.Equal(t, keyF, list.Seek([]byte("g0")).Node().key) - require.Equal(t, keyG, list.Seek([]byte("h0")).Node().key) - require.Equal(t, keyH, list.Seek([]byte("i0")).Node().key) + require.Equal(t, keyH, list.Seek(keyZ, make(seekResult, maxHeight)).Node().key) + + require.Equal(t, keyA, list.Seek([]byte("b0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyB, list.Seek([]byte("c0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyC, list.Seek([]byte("d0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyD, list.Seek([]byte("e0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyE, list.Seek([]byte("f0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyF, list.Seek([]byte("g0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyG, list.Seek([]byte("h0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyH, list.Seek([]byte("i0"), make(seekResult, maxHeight)).Node().key) require.Equal(t, "[a5] [b5] [c5] [d5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) // remove c5 - seekRes := list.Seek([]byte("c0")) + seekRes := list.Seek([]byte("c0"), make(seekResult, maxHeight)) list.Remove(seekRes, seekRes.Node().Next()) - require.Equal(t, keyB, list.Seek([]byte("c0")).Node().key) - require.Equal(t, keyB, list.Seek([]byte("d0")).Node().key) - require.Equal(t, keyD, list.Seek([]byte("e0")).Node().key) + require.Equal(t, keyB, list.Seek([]byte("c0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyB, list.Seek([]byte("d0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyD, list.Seek([]byte("e0"), make(seekResult, maxHeight)).Node().key) require.Equal(t, "[a5] [b5] [d5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) // remove d5 list.Remove(seekRes, seekRes.Node().Next()) - require.Equal(t, keyB, list.Seek([]byte("d0")).Node().key) - require.Equal(t, keyB, list.Seek([]byte("e0")).Node().key) - require.Equal(t, keyE, list.Seek([]byte("f0")).Node().key) + require.Equal(t, keyB, list.Seek([]byte("d0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyB, list.Seek([]byte("e0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyE, list.Seek([]byte("f0"), make(seekResult, maxHeight)).Node().key) require.Equal(t, "[a5] [b5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) // remove the first node - seekRes = list.Seek([]byte("10")) + seekRes = list.Seek([]byte("10"), make(seekResult, maxHeight)) list.Remove(seekRes, seekRes.Node().Next()) require.Equal(t, "[b5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) diff --git a/cdc/puller/metrics.go b/cdc/puller/metrics.go index b05e039312a..c91e1a0f480 100644 --- a/cdc/puller/metrics.go +++ b/cdc/puller/metrics.go @@ -25,6 +25,13 @@ var ( Name: "txn_collect_event_count", Help: "The number of events received from txn collector", }, []string{"namespace", "changefeed", "type"}) + missedRegionCollectCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "puller", + Name: "region_resolved_missed_count", + Help: "The number of regions not cached when forward resolved ts", + }, []string{"namespace", "changefeed", "type"}) pullerResolvedTsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -67,6 +74,7 @@ var ( // InitMetrics registers all metrics in this file func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(txnCollectCounter) + registry.MustRegister(missedRegionCollectCounter) registry.MustRegister(pullerResolvedTsGauge) registry.MustRegister(memBufferSizeGauge) registry.MustRegister(outputChanSizeHistogram) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 60951313211..a0267c82897 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -88,7 +88,13 @@ func New(ctx context.Context, // To make puller level resolved ts initialization distinguishable, we set // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. - tsTracker := frontier.NewFrontier(0, comparableSpans...) + pullerType := "dml" + if len(spans) > 1 { + pullerType = "ddl" + } + metricMissedRegionCollectCounter := missedRegionCollectCounter. + WithLabelValues(changefeed.Namespace, changefeed.ID, pullerType) + tsTracker := frontier.NewFrontier(0, metricMissedRegionCollectCounter, comparableSpans...) kvCli := kv.NewCDCKVClient( ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName) p := &pullerImpl{ @@ -195,8 +201,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { } if e.Resolved != nil { - metricTxnCollectCounterResolved.Add(float64(len(e.Resolved))) - for _, resolvedSpan := range e.Resolved { + metricTxnCollectCounterResolved.Add(float64(len(e.Resolved.Spans))) + for _, resolvedSpan := range e.Resolved.Spans { if !regionspan.IsSubSpan(resolvedSpan.Span, p.spans...) { log.Panic("the resolved span is not in the total span", zap.String("namespace", p.changefeed.Namespace), @@ -208,34 +214,34 @@ func (p *pullerImpl) Run(ctx context.Context) error { ) } // Forward is called in a single thread - p.tsTracker.Forward(resolvedSpan.Span, resolvedSpan.ResolvedTs) - resolvedTs := p.tsTracker.Frontier() - if resolvedTs > 0 && !initialized { - initialized = true - - spans := make([]string, 0, len(p.spans)) - for i := range p.spans { - spans = append(spans, p.spans[i].String()) - } - log.Info("puller is initialized", - zap.String("namespace", p.changefeed.Namespace), - zap.String("changefeed", p.changefeed.ID), - zap.Int64("tableID", p.tableID), - zap.String("tableName", p.tableName), - zap.Uint64("resolvedTs", resolvedTs), - zap.Duration("duration", time.Since(start)), - zap.Strings("spans", spans)) - } - if !initialized || resolvedTs == lastResolvedTs { - continue - } - lastResolvedTs = resolvedTs - err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID}) - if err != nil { - return errors.Trace(err) + p.tsTracker.Forward(resolvedSpan.Region, resolvedSpan.Span, e.Resolved.ResolvedTs) + } + resolvedTs := p.tsTracker.Frontier() + if resolvedTs > 0 && !initialized { + initialized = true + + spans := make([]string, 0, len(p.spans)) + for i := range p.spans { + spans = append(spans, p.spans[i].String()) } - atomic.StoreUint64(&p.resolvedTs, resolvedTs) + log.Info("puller is initialized", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("resolvedTs", resolvedTs), + zap.Duration("duration", time.Since(start)), + zap.Strings("spans", spans)) + } + if !initialized || resolvedTs == lastResolvedTs { + continue + } + lastResolvedTs = resolvedTs + err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID}) + if err != nil { + return errors.Trace(err) } + atomic.StoreUint64(&p.resolvedTs, resolvedTs) } } }) diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 6b9a65c2049..347d7864ae5 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -158,22 +158,25 @@ func TestPullerResolvedForward(t *testing.T) { plr, cancel, wg, store := newPullerForTest(t, spans, checkpointTs) plr.cli.Returns(model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_a"), End: []byte("t_c")}), - ResolvedTs: uint64(1001), - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_a"), End: []byte("t_c")}), + }}, ResolvedTs: uint64(1001), + }, }) plr.cli.Returns(model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_c"), End: []byte("t_d")}), - ResolvedTs: uint64(1002), - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_c"), End: []byte("t_d")}), + }}, ResolvedTs: uint64(1002), + }, }) plr.cli.Returns(model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_d"), End: []byte("t_e")}), - ResolvedTs: uint64(1000), - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_d"), End: []byte("t_e")}), + }}, ResolvedTs: uint64(1000), + }, }) ev := <-plr.Output() require.Equal(t, model.OpTypeResolved, ev.OpType)