Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2(ticdc): directly advance checkpoint ts if only resolved ts #7876

Merged
merged 12 commits into from
Dec 12, 2022
3 changes: 2 additions & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ func New(

// AddTable adds a table to the source manager. Start puller and register table to the engine.
func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, tableName string, startTs model.Ts) {
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(tableID)
p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode)
p.Start(ctx, m.up, m.engine, m.errChan)
m.pullers.Store(tableID, p)
m.engine.AddTable(tableID)
}

// RemoveTable removes a table from the source manager. Stop puller and unregister table from the engine.
Expand Down
28 changes: 26 additions & 2 deletions cdc/sinkv2/tablesink/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ const (
// warnDuration is the duration to warn the progress tracker is not closed.
warnDuration = 3 * time.Minute
// A progressTracker contains several internal fixed-length buffers.
defaultBufferSize uint64 = 1024 * 1024
// NOTICE: the buffer size must be aligned to 8 bytes.
// It shouldn't be too large, otherwise it will consume too much memory.
defaultBufferSize uint64 = 4096
)

// A pendingResolvedTs is received by progressTracker but hasn't been flushed yet.
Expand Down Expand Up @@ -143,11 +145,31 @@ func (r *progressTracker) addResolvedTs(resolvedTs model.ResolvedTs) {
r.mu.Lock()
defer r.mu.Unlock()

if r.nextEventID == 0 {
// NOTICE: We should **NOT** update the `lastMinResolvedTs` when tracker is closed or frozened.
// So there is no need to try to append the resolved ts to `resolvedTsCache`.
if r.frozen || r.closed {
return
}

// If there is no event or all events are flushed, we can update the resolved ts directly.
hicqu marked this conversation as resolved.
Show resolved Hide resolved
if r.nextEventID == 0 || r.nextToResolvePos >= r.nextEventID {
// Update the checkpoint ts.
r.lastMinResolvedTs = resolvedTs
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
return
}

// Sometimes, if there are no events for a long time and a lot of resolved ts are received,
// we can update the last resolved ts directly.
tsCacheLen := len(r.resolvedTsCache)
if tsCacheLen > 0 {
// The offset of the last resolved ts is the last event ID.
// It means no event is adding. We can update the resolved ts directly.
if r.resolvedTsCache[tsCacheLen-1].offset+1 == r.nextEventID {
r.resolvedTsCache[tsCacheLen-1].resolvedTs = resolvedTs
return
}
}

r.resolvedTsCache = append(r.resolvedTsCache, pendingResolvedTs{
offset: r.nextEventID - 1,
resolvedTs: resolvedTs,
Expand Down Expand Up @@ -219,6 +241,8 @@ func (r *progressTracker) advance() model.ResolvedTs {
// If a buffer is finished, release it.
for r.nextToResolvePos-r.nextToReleasePos >= r.bufferSize*64 {
r.nextToReleasePos += r.bufferSize * 64
// Use zero value to release the memory.
r.pendingEvents[0] = nil
r.pendingEvents = r.pendingEvents[1:]
if len(r.pendingEvents) == 0 {
r.pendingEvents = nil
Expand Down
58 changes: 58 additions & 0 deletions cdc/sinkv2/tablesink/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (
"github.com/stretchr/testify/require"
)

// Only for test.
func (r *progressTracker) pendingResolvedTsEventsCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.resolvedTsCache)
}

func TestNewProgressTracker(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -231,3 +238,54 @@ func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) {
}, 3*time.Second, 100*time.Millisecond, "all events should be removed")
require.Equal(t, currentTs, tracker.advance(), "checkpointTs should not be advanced")
}

func TestOnlyResolvedTsShouldDirectlyAdvanceCheckpointTs(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(2))
tracker.addResolvedTs(model.NewResolvedTs(3))
cb3 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(4))
tracker.addResolvedTs(model.NewResolvedTs(5))
require.Equal(t, 3, tracker.trackingCount(), "Events should be added")
cb1()
cb2()
tracker.addResolvedTs(model.NewResolvedTs(6))
require.Equal(t, uint64(3), tracker.advance().Ts, "CheckpointTs should be advanced")
require.Equal(t, 1, tracker.trackingCount(), "Only one event should be left")
require.Equal(t, uint64(3), tracker.advance().Ts, "CheckpointTs still should be 3")
cb3()
require.Equal(t, uint64(6), tracker.advance().Ts, "CheckpointTs should be advanced")
tracker.addResolvedTs(model.NewResolvedTs(7))
tracker.addResolvedTs(model.NewResolvedTs(8))
tracker.addResolvedTs(model.NewResolvedTs(9))
require.Equal(t, 0, tracker.pendingResolvedTsEventsCount(), "ResolvedTsCache should be empty")
require.Equal(t, uint64(9), tracker.advance().Ts, "CheckpointTs should be advanced")
}

func TestShouldDirectlyUpdateResolvedTsIfNoMoreEvents(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(2))
tracker.addResolvedTs(model.NewResolvedTs(3))
require.Equal(t, 2, tracker.pendingResolvedTsEventsCount(), "ResolvedTsCache should only have 2 events")
cb3 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(4))
tracker.addResolvedTs(model.NewResolvedTs(5))
tracker.addResolvedTs(model.NewResolvedTs(6))
cb1()
cb2()
require.Equal(t, uint64(3), tracker.advance().Ts, "CheckpointTs should be advanced")
require.Equal(t, 1, tracker.pendingResolvedTsEventsCount(), "ResolvedTsCache should only have one event")
cb3()
require.Equal(t, uint64(6), tracker.advance().Ts, "CheckpointTs should be advanced")
require.Equal(t, 0, tracker.pendingResolvedTsEventsCount(), "ResolvedTsCache should be empty")
}