From 35f804f316ff0b1a2a765655624367f78f35b117 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Wed, 28 Jul 2021 15:16:17 +0800 Subject: [PATCH] This is an automated cherry-pick of #2367 Signed-off-by: ti-chi-bot --- cdc/processor/pipeline/sink.go | 95 ++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index fa492118e20..912199f5faa 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -142,7 +142,38 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err } func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error { +<<<<<<< HEAD n.eventBuffer = append(n.eventBuffer, event) +======= + if event == nil || event.Row == nil { + return nil + } + + colLen := len(event.Row.Columns) + preColLen := len(event.Row.PreColumns) + config := ctx.ChangefeedVars().Info.Config + + // This indicates that it is an update event, + // and after enable old value internally by default(but disable in the configuration). + // We need to handle the update event to be compatible with the old format. + if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen { + if shouldSplitUpdateEvent(event) { + deleteEvent, insertEvent, err := splitUpdateEvent(event) + if err != nil { + return errors.Trace(err) + } + // NOTICE: Please do not change the order, the delete event always comes before the insert event. + n.eventBuffer = append(n.eventBuffer, deleteEvent, insertEvent) + } else { + // If the handle key columns are not updated, PreColumns is directly ignored. + event.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, event) + } + } else { + n.eventBuffer = append(n.eventBuffer, event) + } + +>>>>>>> f2526c1e (sink(refactor): Refine functions and improve event splitting code (#2367)) if len(n.eventBuffer) >= defaultSyncResolvedBatch { if err := n.flushRow2Sink(ctx); err != nil { return errors.Trace(err) @@ -151,6 +182,70 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE return nil } +// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on +// whether the handle key column has been modified. +// If the handle key column is modified, +// we need to use splitUpdateEvent to split the update event into a delete and an insert event. +func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool { + // nil event will never be split. + if updateEvent == nil { + return false + } + + handleKeyCount := 0 + equivalentHandleKeyCount := 0 + for i := range updateEvent.Row.Columns { + if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() { + handleKeyCount++ + colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value) + preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value) + if colValueString == preColValueString { + equivalentHandleKeyCount++ + } + } + } + + // If the handle key columns are not updated, so we do **not** need to split the event row. + return !(handleKeyCount == equivalentHandleKeyCount) +} + +// splitUpdateEvent splits an update event into a delete and an insert event. +func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEvent, *model.PolymorphicEvent, error) { + if updateEvent == nil { + return nil, nil, errors.New("nil event cannot be split") + } + + // If there is an update to handle key columns, + // we need to split the event into two events to be compatible with the old format. + // NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively, + // so it won't have an impact and no more full deep copy wastes memory. + deleteEvent := *updateEvent + deleteEventRow := *updateEvent.Row + deleteEventRowKV := *updateEvent.RawKV + deleteEvent.Row = &deleteEventRow + deleteEvent.RawKV = &deleteEventRowKV + + deleteEvent.Row.Columns = nil + for i := range deleteEvent.Row.PreColumns { + // NOTICE: Only the handle key pre column is retained in the delete event. + if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { + deleteEvent.Row.PreColumns[i] = nil + } + } + // Align with the old format if old value disabled. + deleteEvent.Row.TableInfoVersion = 0 + + insertEvent := *updateEvent + insertEventRow := *updateEvent.Row + insertEventRowKV := *updateEvent.RawKV + insertEvent.Row = &insertEventRow + insertEvent.RawKV = &insertEventRowKV + // NOTICE: clean up pre cols for insert event. + insertEvent.Row.PreColumns = nil + + return &deleteEvent, &insertEvent, nil +} + func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error { for _, ev := range n.eventBuffer { err := ev.WaitPrepare(ctx)