Skip to content

Commit

Permalink
ticdc: Fix CPU surge problem in sorter (#10739)
Browse files Browse the repository at this point in the history
close #10738
  • Loading branch information
hongyunyan authored Mar 11, 2024
1 parent e0bdf8c commit d19833e
Showing 1 changed file with 36 additions and 20 deletions.
56 changes: 36 additions & 20 deletions cdc/processor/sourcemanager/sorter/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,16 @@ func (s *EventSorter) batchCommitAndUpdateResolvedTs(
case batchEvent := <-batchCh:
// do batch commit
batch := batchEvent.batch
writeBytes.Observe(float64(len(batch.Repr())))
start := time.Now()
if err := batch.Commit(&pebbleWriteOptions); err != nil {
log.Panic("failed to commit pebble batch", zap.Error(err),
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
if !batch.Empty() {
writeBytes.Observe(float64(len(batch.Repr())))
start := time.Now()
if err := batch.Commit(&pebbleWriteOptions); err != nil {
log.Panic("failed to commit pebble batch", zap.Error(err),
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
}
writeDuration.Observe(time.Since(start).Seconds())
}
writeDuration.Observe(time.Since(start).Seconds())

// update resolved ts after commit successfully
batchResolved := batchEvent.batchResolved
Expand Down Expand Up @@ -440,11 +442,10 @@ func (s *EventSorter) handleEvents(
s.wg.Add(1)
go s.batchCommitAndUpdateResolvedTs(batchCh, id)

batch := db.NewBatch()
newResolved := spanz.NewHashMap[model.Ts]()
startToCollectBatch := time.Now()
ticker := time.NewTicker(batchCommitInterval / 2)
defer ticker.Stop()

handleItem := func(item eventWithTableID) {
encodeItemAndBatch := func(batch *pebble.Batch, newResolved *spanz.HashMap[model.Ts], item eventWithTableID) {
if item.event.IsResolved() {
newResolved.ReplaceOrInsert(item.span, item.event.CRTs)
return
Expand All @@ -463,21 +464,36 @@ func (s *EventSorter) handleEvents(
}
}

for {
for len(batch.Repr()) < batchCommitSize && time.Since(startToCollectBatch) < batchCommitInterval {
// Batch item and commit until batch size is larger than batchCommitSize,
// or the time since the last commit is larger than batchCommitInterval.
// Only return false when the sorter is closed.
doBatching := func() (*DBBatchEvent, bool) {
batch := db.NewBatch()
newResolved := spanz.NewHashMap[model.Ts]()
startToBatch := time.Now()
for {
select {
case item := <-inputCh:
handleItem(item)
encodeItemAndBatch(batch, newResolved, item)
if len(batch.Repr()) >= batchCommitSize {
return &DBBatchEvent{batch, newResolved}, true
}
case <-s.closed:
return
default:
return nil, false
case <-ticker.C:
if time.Since(startToBatch) >= batchCommitInterval {
return &DBBatchEvent{batch, newResolved}, true
}
}
}
batchCh <- &DBBatchEvent{batch, newResolved}
}

batch = db.NewBatch()
newResolved = spanz.NewHashMap[model.Ts]()
startToCollectBatch = time.Now()
for {
batchEvent, ok := doBatching()
if !ok {
return
}
batchCh <- batchEvent
}
}

Expand Down

0 comments on commit d19833e

Please sign in to comment.