Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2(ticdc): directly advance checkpoint ts if only resolved ts #7876

Merged
merged 12 commits into from
Dec 12, 2022
14 changes: 11 additions & 3 deletions cdc/sinkv2/tablesink/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// warnDuration is the duration to warn the progress tracker is not closed.
warnDuration = 3 * time.Minute
// A progressTracker contains several internal fixed-length buffers.
defaultBufferSize uint64 = 1024 * 1024
defaultBufferSize uint64 = 256
)

// A pendingResolvedTs is received by progressTracker but hasn't been flushed yet.
Expand Down Expand Up @@ -143,8 +143,12 @@ func (r *progressTracker) addResolvedTs(resolvedTs model.ResolvedTs) {
r.mu.Lock()
defer r.mu.Unlock()

if r.nextEventID == 0 {
r.lastMinResolvedTs = resolvedTs
// If there is no event or all events are flushed, we can update the resolved ts directly.
hicqu marked this conversation as resolved.
Show resolved Hide resolved
if r.nextEventID == 0 || r.nextToResolvePos >= r.nextEventID {
if !r.frozen && !r.closed {
hicqu marked this conversation as resolved.
Show resolved Hide resolved
// Update the checkpoint ts.
r.lastMinResolvedTs = resolvedTs
}
return
}

Expand Down Expand Up @@ -206,6 +210,8 @@ func (r *progressTracker) advance() model.ResolvedTs {
if !r.frozen && !r.closed {
r.lastMinResolvedTs = cached.resolvedTs
}
// Use zero value to release the memory.
r.resolvedTsCache[0] = pendingResolvedTs{}
r.resolvedTsCache = r.resolvedTsCache[1:]
if len(r.resolvedTsCache) == 0 {
r.resolvedTsCache = nil
Expand All @@ -219,6 +225,8 @@ func (r *progressTracker) advance() model.ResolvedTs {
// If a buffer is finished, release it.
for r.nextToResolvePos-r.nextToReleasePos >= r.bufferSize*64 {
r.nextToReleasePos += r.bufferSize * 64
// Use zero value to release the memory.
r.pendingEvents[0] = nil
r.pendingEvents = r.pendingEvents[1:]
if len(r.pendingEvents) == 0 {
r.pendingEvents = nil
Expand Down
31 changes: 31 additions & 0 deletions cdc/sinkv2/tablesink/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (
"github.com/stretchr/testify/require"
)

// Only for test.
func (r *progressTracker) pendingResolvedTsEventsCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.resolvedTsCache)
}

func TestNewProgressTracker(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -231,3 +238,27 @@ func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) {
}, 3*time.Second, 100*time.Millisecond, "all events should be removed")
require.Equal(t, currentTs, tracker.advance(), "checkpointTs should not be advanced")
}

func TestOnlyResolvedTsShouldDirectlyAdvanceCheckpointTs(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(2))
cb3 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(3))
require.Equal(t, 3, tracker.trackingCount(), "Events should be added")
cb1()
cb2()
cb3()
require.Equal(t, uint64(3), tracker.advance().Ts, "CheckpointTs should be advanced")
require.Equal(t, 0, tracker.trackingCount(), "All events should be removed")
require.Equal(t, 0, tracker.pendingResolvedTsEventsCount(), "All resolvedTs events should be removed")
tracker.addResolvedTs(model.NewResolvedTs(4))
tracker.addResolvedTs(model.NewResolvedTs(5))
tracker.addResolvedTs(model.NewResolvedTs(6))
require.Equal(t, 0, tracker.pendingResolvedTsEventsCount(), "ResolvedTsCache should be empty")
require.Equal(t, uint64(6), tracker.advance().Ts, "CheckpointTs should be advanced")
}