Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#2367
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Jul 28, 2021
1 parent 32b34a0 commit e21f1dc
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,38 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
}

func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error {
<<<<<<< HEAD
n.eventBuffer = append(n.eventBuffer, event)
=======
if event == nil || event.Row == nil {
return nil
}

colLen := len(event.Row.Columns)
preColLen := len(event.Row.PreColumns)
config := ctx.ChangefeedVars().Info.Config

// This indicates that it is an update event,
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen {
if shouldSplitUpdateEvent(event) {
deleteEvent, insertEvent, err := splitUpdateEvent(event)
if err != nil {
return errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
n.eventBuffer = append(n.eventBuffer, deleteEvent, insertEvent)
} else {
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
n.eventBuffer = append(n.eventBuffer, event)
}
} else {
n.eventBuffer = append(n.eventBuffer, event)
}

>>>>>>> f2526c1e (sink(refactor): Refine functions and improve event splitting code (#2367))
if len(n.eventBuffer) >= defaultSyncResolvedBatch {
if err := n.flushRow2Sink(ctx); err != nil {
return errors.Trace(err)
Expand All @@ -151,6 +182,70 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
return nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column has been modified.
// If the handle key column is modified,
// we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range updateEvent.Row.Columns {
if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
}
}

// If the handle key columns are not updated, so we do **not** need to split the event row.
return !(handleKeyCount == equivalentHandleKeyCount)
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEvent, *model.PolymorphicEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEventRow := *updateEvent.Row
deleteEventRowKV := *updateEvent.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
// Align with the old format if old value disabled.
deleteEvent.Row.TableInfoVersion = 0

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
insertEventRowKV := *updateEvent.RawKV
insertEvent.Row = &insertEventRow
insertEvent.RawKV = &insertEventRowKV
// NOTICE: clean up pre cols for insert event.
insertEvent.Row.PreColumns = nil

return &deleteEvent, &insertEvent, nil
}

func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error {
for _, ev := range n.eventBuffer {
err := ev.WaitPrepare(ctx)
Expand Down

0 comments on commit e21f1dc

Please sign in to comment.