Skip to content

Commit

Permalink
sink(refactor): Refine functions and improve event splitting code (#2367
Browse files Browse the repository at this point in the history
) (#2398)
  • Loading branch information
ti-chi-bot authored Aug 2, 2021
1 parent 125cf54 commit 75b4d65
Showing 1 changed file with 72 additions and 45 deletions.
117 changes: 72 additions & 45 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,54 +154,17 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen {
handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range event.Row.Columns {
if event.Row.Columns[i].Flag.IsHandleKey() && event.Row.PreColumns[i].Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(event.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(event.Row.PreColumns[i].Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
if shouldSplitUpdateEvent(event) {
deleteEvent, insertEvent, err := splitUpdateEvent(event)
if err != nil {
return errors.Trace(err)
}
}

// If the handle key columns are not updated, PreColumns is directly ignored.
if handleKeyCount == equivalentHandleKeyCount {
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
n.eventBuffer = append(n.eventBuffer, deleteEvent, insertEvent)
} else {
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
n.eventBuffer = append(n.eventBuffer, event)
} else {
// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *event
deleteEventRow := *event.Row
deleteEventRowKV := *event.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
// Align with the old format if old value disabled.
deleteEvent.Row.TableInfoVersion = 0
n.eventBuffer = append(n.eventBuffer, &deleteEvent)

insertEvent := *event
insertEventRow := *event.Row
insertEventRowKV := *event.RawKV
insertEvent.Row = &insertEventRow
insertEvent.RawKV = &insertEventRowKV

// NOTICE: clean up pre cols for insert event.
insertEvent.Row.PreColumns = nil
n.eventBuffer = append(n.eventBuffer, &insertEvent)
}
} else {
n.eventBuffer = append(n.eventBuffer, event)
Expand All @@ -215,6 +178,70 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
return nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column has been modified.
// If the handle key column is modified,
// we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range updateEvent.Row.Columns {
if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
}
}

// If the handle key columns are not updated, so we do **not** need to split the event row.
return !(handleKeyCount == equivalentHandleKeyCount)
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEvent, *model.PolymorphicEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEventRow := *updateEvent.Row
deleteEventRowKV := *updateEvent.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
// Align with the old format if old value disabled.
deleteEvent.Row.TableInfoVersion = 0

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
insertEventRowKV := *updateEvent.RawKV
insertEvent.Row = &insertEventRow
insertEvent.RawKV = &insertEventRowKV
// NOTICE: clean up pre cols for insert event.
insertEvent.Row.PreColumns = nil

return &deleteEvent, &insertEvent, nil
}

func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error {
for _, ev := range n.eventBuffer {
err := ev.WaitPrepare(ctx)
Expand Down

0 comments on commit 75b4d65

Please sign in to comment.