Skip to content

Commit

Permalink
sorter(ticdc): optimize sorter and fix corner case of updating resolv…
Browse files Browse the repository at this point in the history
…edTs (#10618)

ref #10457
  • Loading branch information
hongyunyan authored Feb 27, 2024
1 parent d36cc29 commit 25ce29c
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 114 deletions.
6 changes: 3 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,10 @@ LOOP2:
defer c.wg.Done()
ctx.Throw(c.redoMetaMgr.Run(cancelCtx))
}()
log.Info("owner creates redo manager",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))
}
log.Info("owner creates redo manager",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))

c.ddlManager = newDDLManager(
c.id,
Expand Down
210 changes: 99 additions & 111 deletions cdc/processor/sourcemanager/sorter/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
_ sorter.EventIterator = (*EventIter)(nil)
)

var pebbleWriteOptions = pebble.WriteOptions{Sync: false}

// EventSorter is an event sort engine.
type EventSorter struct {
// Read-only fields.
Expand All @@ -58,10 +60,10 @@ type EventSorter struct {

// EventIter implements sorter.EventIterator.
type EventIter struct {
tableID model.TableID
iter *pebble.Iterator
headItem *model.PolymorphicEvent
serde encoding.MsgPackGenSerde
tableID model.TableID
iter *pebble.Iterator
currentEvent *model.PolymorphicEvent
serde encoding.MsgPackGenSerde

nextDuration prometheus.Observer
}
Expand All @@ -82,23 +84,11 @@ func New(ID model.ChangeFeedID, dbs []*pebble.DB) *EventSorter {
}

for i := range eventSorter.dbs {
fetchTokens := make(chan struct{}, 1)
ioTokens := make(chan struct{}, 1)
fetchTokens <- struct{}{}
ioTokens <- struct{}{}

// Start 2 goroutines for every db instance. When one goroutine is busy on I/O,
// the another one can still keep retrieving events.
eventSorter.wg.Add(1)
go func(x int, fetchTokens, ioTokens chan struct{}) {
defer eventSorter.wg.Done()
eventSorter.handleEvents(x, dbs[x], channs[x].Out(), fetchTokens, ioTokens)
}(i, fetchTokens, ioTokens)
eventSorter.wg.Add(1)
go func(x int, fetchTokens, ioTokens chan struct{}) {
go func(x int) {
defer eventSorter.wg.Done()
eventSorter.handleEvents(x, dbs[x], channs[x].Out(), fetchTokens, ioTokens)
}(i, fetchTokens, ioTokens)
eventSorter.handleEvents(x, dbs[x], channs[x].Out())
}(i)
}

return eventSorter
Expand Down Expand Up @@ -317,29 +307,37 @@ func (s *EventSorter) SlotsAndHasher() (slotCount int, hasher func(tablepb.Span,
}

// Next implements sorter.EventIterator.
func (s *EventIter) Next() (event *model.PolymorphicEvent, pos sorter.Position, err error) {
// txnFinished indicates whether all events in the current transaction are
// fetched or not.
func (s *EventIter) Next() (event *model.PolymorphicEvent, txnFinished sorter.Position, err error) {
valid := s.iter != nil && s.iter.Valid()
var value []byte
var nextEvent *model.PolymorphicEvent

// We need to decide whether the current event is the last event in this transactions
// If the current event is the last one, we need to set txnFinished
// Thus, we need to fetch the next event and compare the commitTs and startTs with it
for valid {
nextStart := time.Now()
value, valid = s.iter.Value(), s.iter.Next()
s.nextDuration.Observe(time.Since(nextStart).Seconds())

event = &model.PolymorphicEvent{}
if _, err = s.serde.Unmarshal(event, value); err != nil {
nextEvent = &model.PolymorphicEvent{}
if _, err = s.serde.Unmarshal(nextEvent, value); err != nil {
return
}
if s.headItem != nil {
if s.currentEvent != nil {
break
}
s.headItem, event = event, nil
s.currentEvent, nextEvent = nextEvent, nil
}
if s.headItem != nil {
if event == nil || s.headItem.CRTs != event.CRTs || s.headItem.StartTs != event.StartTs {
pos.CommitTs = s.headItem.CRTs
pos.StartTs = s.headItem.StartTs
if s.currentEvent != nil {
if nextEvent == nil || s.currentEvent.CRTs != nextEvent.CRTs || s.currentEvent.StartTs != nextEvent.StartTs {
txnFinished.CommitTs = s.currentEvent.CRTs
txnFinished.StartTs = s.currentEvent.StartTs
}
event, s.headItem = s.headItem, event
event = s.currentEvent
s.currentEvent = nextEvent
}
return
}
Expand Down Expand Up @@ -371,17 +369,80 @@ type tableState struct {
cleaned sorter.Position
}

func (s *EventSorter) handleEvents(
id int, db *pebble.DB, inputCh <-chan eventWithTableID,
fetchTokens, ioTokens chan struct{},
// DBBatchEvent is used to contains a batch of events and the corresponding resolvedTs info.
type DBBatchEvent struct {
batch *pebble.Batch
batchResolved *spanz.HashMap[model.Ts]
}

// batchCommitAndUpdateResolvedTs commits the batch and updates the resolved ts of the table.
func (s *EventSorter) batchCommitAndUpdateResolvedTs(
batchCh chan *DBBatchEvent,
id int,
) {
idstr := strconv.Itoa(id + 1)
writeDuration := sorter.WriteDuration().WithLabelValues(idstr)
writeBytes := sorter.WriteBytes().WithLabelValues(idstr)

for {
select {
case <-s.closed:
s.wg.Done()
return
case batchEvent := <-batchCh:
// do batch commit
batch := batchEvent.batch
writeBytes.Observe(float64(len(batch.Repr())))
start := time.Now()
if err := batch.Commit(&pebbleWriteOptions); err != nil {
log.Panic("failed to commit pebble batch", zap.Error(err),
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
}
writeDuration.Observe(time.Since(start).Seconds())

// update resolved ts after commit successfully
batchResolved := batchEvent.batchResolved
batchResolved.Range(func(span tablepb.Span, resolved uint64) bool {
s.mu.RLock()
ts, ok := s.tables.Get(span)
s.mu.RUnlock()
if !ok {
log.Debug("Table is removed, skip updating resolved",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("resolved", resolved))
return true
}
ts.sortedResolved.Store(resolved)
for _, onResolve := range s.onResolves {
onResolve(span, resolved)
}
return true
})
}
}
}

// handleEvents encode events from channel and try to write them into pebble.
// It will commit the batch when the size of the batch is larger than batchCommitSize or
// the time since the last commit is larger than batchCommitInterval.
// It will also update the resolved ts of the table when the batch is committed.
// Considering commit is a heavy operation, we make [fetch and decode event] and
// [do commit and update resolved ts] as two separate goroutines to make pipeline.
func (s *EventSorter) handleEvents(
id int, db *pebble.DB, inputCh <-chan eventWithTableID,
) {
// We set a relatively small channel value to avoid possible OOM caused by too much information.
// The number of channels will not affect the performance of commit consumption currently.
batchCh := make(chan *DBBatchEvent, 8)
s.wg.Add(1)
go s.batchCommitAndUpdateResolvedTs(batchCh, id)

batch := db.NewBatch()
writeOpts := &pebble.WriteOptions{Sync: false}
newResolved := spanz.NewHashMap[model.Ts]()
startToCollectBatch := time.Now()

handleItem := func(item eventWithTableID) {
if item.event.IsResolved() {
Expand All @@ -395,101 +456,28 @@ func (s *EventSorter) handleEvents(
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
}
if err = batch.Set(key, value, writeOpts); err != nil {
if err = batch.Set(key, value, &pebbleWriteOptions); err != nil {
log.Panic("failed to update pebble batch", zap.Error(err),
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
}
}

for {
// Wait for a fetch token.
select {
case <-fetchTokens:
case <-s.closed:
return
}

startToCollectBatch := time.Now()
select {
case item := <-inputCh:
handleItem(item)
case <-s.closed:
return
}
LOOP1: // Keep retrieving events until a batch is collected.
for len(batch.Repr()) < batchCommitSize && time.Since(startToCollectBatch) < batchCommitInterval {
select {
case item := <-inputCh:
handleItem(item)
case <-s.closed:
return
default:
break LOOP1
}
}
LOOP2: // Keep retrieving events until an io token is available.
for {
if len(batch.Repr()) < batchCommitSize {
select {
case <-ioTokens:
break LOOP2
case <-s.closed:
return
default:
}
select {
case <-ioTokens:
break LOOP2
case <-s.closed:
return
case item := <-inputCh:
handleItem(item)
}
} else {
select {
case <-ioTokens:
break LOOP2
case <-s.closed:
return
}
}
}
batchCh <- &DBBatchEvent{batch, newResolved}

fetchTokens <- struct{}{}
if batch.Count() > 0 {
writeBytes.Observe(float64(len(batch.Repr())))
start := time.Now()
if err := batch.Commit(writeOpts); err != nil {
log.Panic("failed to commit pebble batch", zap.Error(err),
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
}
writeDuration.Observe(time.Since(start).Seconds())
batch = db.NewBatch()
}

newResolved.Range(func(span tablepb.Span, resolved uint64) bool {
s.mu.RLock()
ts, ok := s.tables.Get(span)
if !ok {
log.Debug("Table is removed, skip updating resolved",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("resolved", resolved))
s.mu.RUnlock()
return false
}
ts.sortedResolved.Store(resolved)
for _, onResolve := range s.onResolves {
onResolve(span, resolved)
}
s.mu.RUnlock()
return true
})
batch = db.NewBatch()
newResolved = spanz.NewHashMap[model.Ts]()
ioTokens <- struct{}{}
startToCollectBatch = time.Now()
}
}

Expand Down Expand Up @@ -518,7 +506,7 @@ func (s *EventSorter) cleanTable(
end = encoding.EncodeTsKey(state.uniqueID, uint64(span.TableID), toCleanNext.CommitTs, toCleanNext.StartTs)

db := s.dbs[getDB(span, len(s.dbs))]
err := db.DeleteRange(start, end, &pebble.WriteOptions{Sync: false})
err := db.DeleteRange(start, end, &pebbleWriteOptions)
if err != nil {
log.Info("clean stale table range fails",
zap.String("namespace", s.changefeedID.Namespace),
Expand Down

0 comments on commit 25ce29c

Please sign in to comment.