Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frointier improve #10

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
85d1197
add log rate limit for table actor
sdojjy Sep 22, 2022
0eac0e7
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
6701809
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
5670c40
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
97db214
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
d8fff7a
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
72fa8a6
Merge branch 'master' into add-log-rate-limit-for-table-actro
sdojjy Sep 28, 2022
f9e9107
add rate limiter for resolve lock
sdojjy Sep 28, 2022
19a00da
improve frontier
sdojjy Sep 29, 2022
c33dc39
improve frontier
sdojjy Sep 29, 2022
9ef6307
improve frontier
sdojjy Sep 29, 2022
69a013c
add metrics
sdojjy Sep 29, 2022
d837620
add ut
sdojjy Sep 29, 2022
39e2938
add ut
sdojjy Sep 29, 2022
253c3c0
add ut
sdojjy Sep 29, 2022
89338c2
add ut
sdojjy Sep 29, 2022
0a0e268
add ut
sdojjy Sep 29, 2022
d52391b
Merge remote-tracking branch 'upstream/master' into frointier-improve
sdojjy Sep 30, 2022
02ec537
fix lint
sdojjy Sep 30, 2022
0c3b4b0
fix lint
sdojjy Sep 30, 2022
2fffbb4
Merge branch 'master' into frointier-improve
sdojjy Sep 30, 2022
fba5f39
Merge branch 'master' into frointier-improve
sdojjy Oct 6, 2022
98f0790
fix lint
sdojjy Oct 6, 2022
603fe1c
fix lint
sdojjy Oct 6, 2022
813b72d
fix lint
sdojjy Oct 6, 2022
68bf5da
fix ut
sdojjy Oct 7, 2022
2985826
Merge branch 'master' into frointier-improve
sdojjy Oct 7, 2022
9e365db
fix ut
sdojjy Oct 8, 2022
c9ffa3d
Merge remote-tracking branch 'origin/frointier-improve' into frointie…
sdojjy Oct 8, 2022
e3a81a0
Merge branch 'master' into frointier-improve
sdojjy Oct 9, 2022
9f7b12b
address comments
sdojjy Oct 9, 2022
43b9b4f
Merge remote-tracking branch 'origin/frointier-improve' into frointie…
sdojjy Oct 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,10 +878,15 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
// After this resolved ts event is sent, we don't need to send one more
// resolved ts event when the region starts to work.
resolvedEv := model.RegionFeedEvent{
Resolved: []*model.ResolvedSpan{{
Span: sri.span,
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{
{
Span: sri.span,
Region: sri.verID.GetID(),
},
},
ResolvedTs: sri.ts,
}},
},
}
select {
case s.eventCh <- resolvedEv:
Expand Down
205 changes: 121 additions & 84 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
}

checkEvent := func(event model.RegionFeedEvent, ts uint64) {
require.Equal(t, ts, event.Resolved[0].ResolvedTs)
require.Equal(t, ts, event.Resolved.ResolvedTs)
}

initialized := mockInitializedEvent(3 /* regionID */, currentRequestID())
Expand Down Expand Up @@ -605,7 +605,7 @@ consumePreResolvedTs:
select {
case event = <-eventCh:
require.NotNil(t, event.Resolved)
require.Equal(t, uint64(100), event.Resolved[0].ResolvedTs)
require.Equal(t, uint64(100), event.Resolved.ResolvedTs)
case <-time.After(time.Second):
break consumePreResolvedTs
}
Expand Down Expand Up @@ -641,7 +641,7 @@ consumePreResolvedTs:
require.FailNow(t, "reconnection not succeed in 3 seconds")
}
require.NotNil(t, event.Resolved)
require.Equal(t, uint64(120), event.Resolved[0].ResolvedTs)
require.Equal(t, uint64(120), event.Resolved.ResolvedTs)

cancel()
}
Expand Down Expand Up @@ -1080,10 +1080,12 @@ func testHandleFeedEvent(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 100,
},
},
{
Val: &model.RawKVEntry{
Expand Down Expand Up @@ -1151,25 +1153,32 @@ func testHandleFeedEvent(t *testing.T) {
RegionID: 3,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 135,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 135,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 145,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 145,
},
},
}
multipleExpected := model.RegionFeedEvent{
Resolved: make([]*model.ResolvedSpan, multiSize),
}
for i := range multipleExpected.Resolved {
multipleExpected.Resolved[i] = &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Resolved: &model.ResolvedSpans{
Spans: make([]model.RegionComparableSpan, multiSize),
ResolvedTs: 160,
},
}
for i := range multipleExpected.Resolved.Spans {
multipleExpected.Resolved.Spans[i] = model.RegionComparableSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}
}

Expand Down Expand Up @@ -1327,7 +1336,7 @@ func TestStreamSendWithError(t *testing.T) {
select {
case event := <-eventCh:
require.NotNil(t, event.Resolved)
require.Equal(t, 1, len(event.Resolved))
require.Equal(t, 1, len(event.Resolved.Spans))
require.NotNil(t, 0, event.RegionID)
case <-time.After(time.Second):
require.Fail(t, fmt.Sprintf("expected events are not receive, received: %v", initRegions))
Expand Down Expand Up @@ -1431,16 +1440,20 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
},
}

Expand All @@ -1449,7 +1462,7 @@ eventLoop:
for {
select {
case ev := <-eventCh:
if ev.Resolved[0].ResolvedTs != uint64(100) {
if ev.Resolved.ResolvedTs != uint64(100) {
events = append(events, ev)
}
case <-time.After(time.Second):
Expand Down Expand Up @@ -1617,7 +1630,7 @@ ReceiveLoop:
break ReceiveLoop
}
received = append(received, event)
if event.Resolved[0].ResolvedTs == 130 {
if event.Resolved.ResolvedTs == 130 {
break ReceiveLoop
}
case <-time.After(time.Second):
Expand All @@ -1626,7 +1639,7 @@ ReceiveLoop:
}
var lastResolvedTs uint64
for _, e := range received {
if lastResolvedTs > e.Resolved[0].ResolvedTs {
if lastResolvedTs > e.Resolved.ResolvedTs {
require.Fail(t, fmt.Sprintf("the resolvedTs is back off %#v", resolved))
}
}
Expand Down Expand Up @@ -1753,7 +1766,7 @@ func TestIncompatibleTiKV(t *testing.T) {
ch1 <- initialized
select {
case event := <-eventCh:
require.Equal(t, 1, len(event.Resolved))
require.Equal(t, 1, len(event.Resolved.Spans))
case <-time.After(time.Second):
require.Fail(t, "expected events are not receive")
}
Expand Down Expand Up @@ -1823,7 +1836,7 @@ func TestNoPendingRegionError(t *testing.T) {
ch1 <- initialized
ev := <-eventCh
require.NotNil(t, ev.Resolved)
require.Equal(t, uint64(100), ev.Resolved[0].ResolvedTs)
require.Equal(t, uint64(100), ev.Resolved.ResolvedTs)

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
Expand All @@ -1835,7 +1848,7 @@ func TestNoPendingRegionError(t *testing.T) {
ch1 <- resolved
ev = <-eventCh
require.NotNil(t, ev.Resolved)
require.Equal(t, uint64(200), ev.Resolved[0].ResolvedTs)
require.Equal(t, uint64(200), ev.Resolved.ResolvedTs)

cancel()
}
Expand Down Expand Up @@ -1910,22 +1923,28 @@ func TestDropStaleRequest(t *testing.T) {
}}
expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 130,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 130,
},
},
}

Expand Down Expand Up @@ -2010,16 +2029,20 @@ func TestResolveLock(t *testing.T) {
}}
expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: tso,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: tso,
},
},
}
ch1 <- resolved
Expand Down Expand Up @@ -2342,17 +2365,21 @@ func testEventAfterFeedStop(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
RegionID: regionID,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 100,
},
RegionID: regionID,
},
{
Expand All @@ -2367,10 +2394,12 @@ func testEventAfterFeedStop(t *testing.T) {
RegionID: 3,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: regionID,
}}, ResolvedTs: 120,
},
RegionID: regionID,
},
}
Expand Down Expand Up @@ -2542,10 +2571,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 100,
},
},
{
Val: &model.RawKVEntry{
Expand All @@ -2570,10 +2601,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
RegionID: 3,
},
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 145,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 145,
},
},
}

Expand Down Expand Up @@ -3025,17 +3058,19 @@ func testKVClientForceReconnect(t *testing.T) {
ch2 <- resolved

expected := model.RegionFeedEvent{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
Region: regionID3,
}}, ResolvedTs: 135,
},
}

eventLoop:
for {
select {
case ev := <-eventCh:
if ev.Resolved != nil && ev.Resolved[0].ResolvedTs == uint64(100) {
if ev.Resolved != nil && ev.Resolved.ResolvedTs == uint64(100) {
continue
}
require.Equal(t, expected, ev)
Expand Down Expand Up @@ -3263,10 +3298,12 @@ func TestEvTimeUpdate(t *testing.T) {

expected := []model.RegionFeedEvent{
{
Resolved: []*model.ResolvedSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
}},
Resolved: &model.ResolvedSpans{
Spans: []model.RegionComparableSpan{{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Region: 3,
}}, ResolvedTs: 100,
},
},
{
Val: &model.RawKVEntry{
Expand Down
Loading