Skip to content

Commit

Permalink
some updates for code
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu authored and ti-chi-bot committed Sep 8, 2023
1 parent a6365be commit e1c4a1b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 9 deletions.
15 changes: 6 additions & 9 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,23 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask)
}

func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) {
advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager)
// The task is finished and some required memory isn't used.
defer advancer.cleanup()

lowerBound, upperBound := validateAndAdjustBound(
w.changefeedID,
&task.span,
task.lowerBound,
task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()),
)
advancer.lastPos = lowerBound.Prev()

var cache *eventAppender
if w.eventCache != nil {
cache = w.eventCache.maybeCreateAppender(task.span, lowerBound)
}

advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager)
// The task is finished and some required memory isn't used.
defer advancer.cleanup()

iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.memQuota)
allEventCount := 0
cachedSize := uint64(0)
Expand Down Expand Up @@ -127,11 +128,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
cache.pushBatch(nil, 0, upperBound)
}

return advancer.finish(
ctx,
cachedSize,
upperBound,
)
return advancer.finish(ctx, cachedSize, upperBound)
}

allEventCount += 1
Expand Down
44 changes: 44 additions & 0 deletions cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,47 @@ func (suite *redoLogWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceIfNoWorkloa
cancel()
wg.Wait()
}

// When starts to handle a task, advancer.lastPos should be set to a correct position.
// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an
// invalid `advancer.lastPos`.
func (suite *redoLogWorkerSuite) TestHandleTaskWithoutMemory() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
w, e, _ := suite.createWorker(ctx, 0)
defer w.memQuota.Close()
suite.addEventsToSortEngine(events, e)

taskChan := make(chan *redoTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos)
close(chShouldBeClosed)
}
taskChan <- &redoTask{
span: suite.testSpan,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}
2 changes: 2 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
&task.span,
task.lowerBound,
task.getUpperBound(task.tableSink.getUpperBoundTs()))
advancer.lastPos = lowerBound.Prev()

allEventSize := uint64(0)
allEventCount := 0
Expand Down Expand Up @@ -211,6 +212,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
performCallback(lowerBound.Prev())
return nil
}
advancer.lastPos = lowerBound.Prev()
}

// lowerBound and upperBound are both closed intervals.
Expand Down
44 changes: 44 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,47 @@ func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() {
cancel()
wg.Wait()
}

// When starts to handle a task, advancer.lastPos should be set to a correct position.
// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an
// invalid `advancer.lastPos`.
func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
w, e := suite.createWorker(ctx, 0, true)
defer w.sinkMemQuota.Close()
suite.addEventsToSortEngine(events, e)

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos)
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
span: suite.testSpan,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

0 comments on commit e1c4a1b

Please sign in to comment.