Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(refactor): Refine functions and improve event splitting code #2367

Merged
merged 10 commits into from
Jul 28, 2021
121 changes: 76 additions & 45 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,54 +154,16 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen {
handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range event.Row.Columns {
if event.Row.Columns[i].Flag.IsHandleKey() && event.Row.PreColumns[i].Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(event.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(event.Row.PreColumns[i].Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
if shouldSplitUpdateEvent(event) {
splitEvents, err := splitUpdateEvent(event)
if err != nil {
return errors.Trace(err)
}
}

// If the handle key columns are not updated, PreColumns is directly ignored.
if handleKeyCount == equivalentHandleKeyCount {
n.eventBuffer = append(n.eventBuffer, splitEvents...)
} else {
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
n.eventBuffer = append(n.eventBuffer, event)
} else {
// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *event
deleteEventRow := *event.Row
deleteEventRowKV := *event.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
// Align with the old format if old value disabled.
deleteEvent.Row.TableInfoVersion = 0
n.eventBuffer = append(n.eventBuffer, &deleteEvent)

insertEvent := *event
insertEventRow := *event.Row
insertEventRowKV := *event.RawKV
insertEvent.Row = &insertEventRow
insertEvent.RawKV = &insertEventRowKV

// NOTICE: clean up pre cols for insert event.
insertEvent.Row.PreColumns = nil
n.eventBuffer = append(n.eventBuffer, &insertEvent)
}
} else {
n.eventBuffer = append(n.eventBuffer, event)
Expand All @@ -215,6 +177,75 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE
return nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column has been modified.
// If the handle key column is modified,
// we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range updateEvent.Row.Columns {
if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
}
}

// If the handle key columns are not updated, so we do **not** need to split the event row.
return !(handleKeyCount == equivalentHandleKeyCount)
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(updateEvent *model.PolymorphicEvent) ([]*model.PolymorphicEvent, error) {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
if updateEvent == nil {
return nil, errors.New("nil event cannot be split")
}

var result []*model.PolymorphicEvent
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEventRow := *updateEvent.Row
deleteEventRowKV := *updateEvent.RawKV
deleteEvent.Row = &deleteEventRow
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
// Align with the old format if old value disabled.
deleteEvent.Row.TableInfoVersion = 0
result = append(result, &deleteEvent)

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
insertEventRowKV := *updateEvent.RawKV
insertEvent.Row = &insertEventRow
insertEvent.RawKV = &insertEventRowKV

// NOTICE: clean up pre cols for insert event.
insertEvent.Row.PreColumns = nil
result = append(result, &insertEvent)

return result, nil
}

func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error {
for _, ev := range n.eventBuffer {
err := ev.WaitPrepare(ctx)
Expand Down