Skip to content

Commit

Permalink
puller(ticdc): fix resolvedTs get stuck when region split and merge (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 18, 2024
1 parent e9d1010 commit b118c02
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
3 changes: 2 additions & 1 deletion cdc/puller/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *spanFrontier) Frontier() uint64 {
func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) {
// it's the fast part to detect if the region is split or merged,
// if not we can update the minTsHeap with use new ts directly
if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil {
if n, ok := s.cachedRegions[regionID]; ok && n.regionID == regionID && n.end != nil {
if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) {
s.minTsHeap.UpdateKey(n.Value(), ts)
return
Expand All @@ -106,6 +106,7 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t
if next != nil {
if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) {
s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts)
delete(s.cachedRegions, seekRes.Node().regionID)
if regionID != fakeRegionID {
s.cachedRegions[regionID] = seekRes.Node()
s.cachedRegions[regionID].regionID = regionID
Expand Down
19 changes: 19 additions & 0 deletions cdc/puller/frontier/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package frontier

import (
"bytes"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -437,3 +438,21 @@ func TestFrontierEntries(t *testing.T) {
require.Equal(t, []byte("a"), slowestRange.Start)
require.Equal(t, []byte("b"), slowestRange.End)
}

func TestMergeSpitWithDifferentRegionID(t *testing.T) {
frontier := NewFrontier(100, c, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")})
frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1222)
frontier.Forward(2, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 102)
frontier.Forward(4, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 103)
frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 104)
frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1223)
frontier.Forward(3, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 105)
frontier.Forward(2, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 107)
frontier.(*spanFrontier).spanList.Entries(func(node *skipListNode) bool {
fmt.Printf("%d:[%s: %s) %d\n", node.regionID,
string(node.Key()),
string(node.End()), node.value.key)
return true
})
require.Equal(t, uint64(107), frontier.Frontier())
}

0 comments on commit b118c02

Please sign in to comment.