From b118c02aa86af407357b34ebcd3e17e2dc33e8e1 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 18 Jan 2024 16:55:47 +0800 Subject: [PATCH] puller(ticdc): fix resolvedTs get stuck when region split and merge (#10488) (#10494) ref pingcap/tiflow#10157 --- cdc/puller/frontier/frontier.go | 3 ++- cdc/puller/frontier/frontier_test.go | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 46a18e547f1..983808ad598 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -84,7 +84,7 @@ func (s *spanFrontier) Frontier() uint64 { func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { // it's the fast part to detect if the region is split or merged, // if not we can update the minTsHeap with use new ts directly - if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil { + if n, ok := s.cachedRegions[regionID]; ok && n.regionID == regionID && n.end != nil { if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) return @@ -106,6 +106,7 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t if next != nil { if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) + delete(s.cachedRegions, seekRes.Node().regionID) if regionID != fakeRegionID { s.cachedRegions[regionID] = seekRes.Node() s.cachedRegions[regionID].regionID = regionID diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index ca1daefb377..d621f463b50 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -15,6 +15,7 @@ package frontier import ( "bytes" + "fmt" "math" "math/rand" "sort" @@ -437,3 +438,21 @@ func TestFrontierEntries(t *testing.T) { require.Equal(t, []byte("a"), slowestRange.Start) require.Equal(t, []byte("b"), slowestRange.End) } + +func TestMergeSpitWithDifferentRegionID(t *testing.T) { + frontier := NewFrontier(100, c, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}) + frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1222) + frontier.Forward(2, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 102) + frontier.Forward(4, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 103) + frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 104) + frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1223) + frontier.Forward(3, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 105) + frontier.Forward(2, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 107) + frontier.(*spanFrontier).spanList.Entries(func(node *skipListNode) bool { + fmt.Printf("%d:[%s: %s) %d\n", node.regionID, + string(node.Key()), + string(node.End()), node.value.key) + return true + }) + require.Equal(t, uint64(107), frontier.Frontier()) +}