From ba52965fd7e286172570e462104342fa66678345 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 13:11:09 +0800 Subject: [PATCH 01/21] puller,mounter,processor: always pull the old value internally --- cdc/entry/mounter.go | 18 +++++++++++++++++- cdc/processor.go | 2 +- cdc/processor/pipeline/puller.go | 3 +-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index af50a6185e6..fe098d2cbc2 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -454,13 +454,29 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr var err error // Decode previous columns. var preCols []*model.Column - if row.PreRowExist { + // Since we now always use old value internally, + // we need to control the output(sink will use the PreColumns field to determine whether to output old value). + // Normally old value is output when only enableOldValue is on, + // but for the Delete event, when the old value feature is off, + // the HandleKey column needs to be included as well. So we need to do the following filtering. + if (row.PreRowExist && m.enableOldValue) || (row.PreRowExist && row.Delete) { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info preCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) if err != nil { return nil, errors.Trace(err) } + + // NOTICE: When the old Value feature is off, + // the Delete event only needs to keep the handle key column. + if row.Delete && !m.enableOldValue { + for i, _ := range preCols { + col := preCols[i] + if col.Flag.IsHandleKey() { + preCols[i] = nil + } + } + } } var cols []*model.Column diff --git a/cdc/processor.go b/cdc/processor.go index 06b23636a8f..528916d036f 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -820,7 +820,7 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo } plr := puller.NewPuller(ctx, p.pdCli, p.credential, kvStorage, replicaInfo.StartTs, []regionspan.Span{span}, p.limitter, - enableOldValue) + true) go func() { err := plr.Run(ctx) if errors.Cause(err) != context.Canceled { diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 7e82cea50b5..15daabb61f5 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -65,12 +65,11 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span { func (n *pullerNode) Init(ctx pipeline.NodeContext) error { metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName) globalConfig := config.GetGlobalServerConfig() - config := ctx.ChangefeedVars().Info.Config ctxC, cancel := context.WithCancel(ctx) ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage, - n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue) + n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, true) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) return nil From d1b480cd96accfa0e927ad534abbbaf385041864 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 13:21:05 +0800 Subject: [PATCH 02/21] mounter: fix lint --- cdc/entry/mounter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index fe098d2cbc2..bf54c61ab6e 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -470,7 +470,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr // NOTICE: When the old Value feature is off, // the Delete event only needs to keep the handle key column. if row.Delete && !m.enableOldValue { - for i, _ := range preCols { + for i := range preCols { col := preCols[i] if col.Flag.IsHandleKey() { preCols[i] = nil From fd57499219475900f13b7f3552c2169cd0b37c58 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 13:24:15 +0800 Subject: [PATCH 03/21] puller: add comments --- cdc/processor.go | 2 ++ cdc/processor/pipeline/puller.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/cdc/processor.go b/cdc/processor.go index 528916d036f..82b5b954b87 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -818,6 +818,8 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo p.sendError(err) return nil } + // NOTICE: always pull the old value internally + // See also: TODO(hi-rustin): add issue link here. plr := puller.NewPuller(ctx, p.pdCli, p.credential, kvStorage, replicaInfo.StartTs, []regionspan.Span{span}, p.limitter, true) diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 15daabb61f5..df3c170d7fe 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -68,6 +68,8 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error { ctxC, cancel := context.WithCancel(ctx) ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) + // NOTICE: always pull the old value internally + // See also: TODO(hi-rustin): add issue link here. plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage, n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, true) n.wg.Go(func() error { From 018022633e2a54c665865351e8e5f873acc5e2d9 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 13:30:31 +0800 Subject: [PATCH 04/21] puller: fix typo --- cdc/entry/mounter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index bf54c61ab6e..b6f1c3cfdf8 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -472,7 +472,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr if row.Delete && !m.enableOldValue { for i := range preCols { col := preCols[i] - if col.Flag.IsHandleKey() { + if !col.Flag.IsHandleKey() { preCols[i] = nil } } From f06fad87734983da1e8bc2f3188c86302d90da62 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 14:39:23 +0800 Subject: [PATCH 05/21] puller: fix test --- cdc/entry/mounter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index b6f1c3cfdf8..8ba3f958c87 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -472,7 +472,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr if row.Delete && !m.enableOldValue { for i := range preCols { col := preCols[i] - if !col.Flag.IsHandleKey() { + if col != nil && !col.Flag.IsHandleKey() { preCols[i] = nil } } From 0acdf474278e4de340338d0a6b634392308a8e01 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 21:32:52 +0800 Subject: [PATCH 06/21] puller: always pulls kv events with old-value enabled --- cdc/processor.go | 3 +-- cdc/processor/pipeline/puller.go | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index 82b5b954b87..317791f0372 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -811,8 +811,7 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo startPuller := func(tableID model.TableID, pResolvedTs *uint64, pCheckpointTs *uint64) sink.Sink { // start table puller - enableOldValue := p.changefeed.Config.EnableOldValue - span := regionspan.GetTableSpan(tableID, enableOldValue) + span := regionspan.GetTableSpan(tableID, true) kvStorage, err := util.KVStorageFromCtx(ctx) if err != nil { p.sendError(err) diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index df3c170d7fe..ac97b5c5353 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -54,10 +54,10 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span { // start table puller config := ctx.ChangefeedVars().Info.Config spans := make([]regionspan.Span, 0, 4) - spans = append(spans, regionspan.GetTableSpan(n.tableID, config.EnableOldValue)) + spans = append(spans, regionspan.GetTableSpan(n.tableID, true)) if config.Cyclic.IsEnabled() && n.replicaInfo.MarkTableID != 0 { - spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID, config.EnableOldValue)) + spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID, true)) } return spans } From 39e09adb8b36b4aa3029eda776eea0baaa9affbd Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 21:43:03 +0800 Subject: [PATCH 07/21] mounter: stop subscribe index update --- cdc/entry/codec.go | 19 ----- cdc/entry/mounter.go | 175 +------------------------------------------ 2 files changed, 1 insertion(+), 193 deletions(-) diff --git a/cdc/entry/codec.go b/cdc/entry/codec.go index f8803125e3b..0aed19266c4 100644 --- a/cdc/entry/codec.go +++ b/cdc/entry/codec.go @@ -31,7 +31,6 @@ import ( var ( tablePrefix = []byte{'t'} recordPrefix = []byte("_r") - indexPrefix = []byte("_i") metaPrefix = []byte("m") ) @@ -39,11 +38,9 @@ var ( intLen = 8 tablePrefixLen = len(tablePrefix) recordPrefixLen = len(recordPrefix) - indexPrefixLen = len(indexPrefix) metaPrefixLen = len(metaPrefix) prefixTableIDLen = tablePrefixLen + intLen /*tableID*/ prefixRecordIDLen = recordPrefixLen + intLen /*recordID*/ - prefixIndexLen = indexPrefixLen + intLen /*indexID*/ ) // MetaType is for data structure meta/data flag. @@ -120,22 +117,6 @@ func decodeRecordID(key []byte) (rest []byte, recordID int64, err error) { return } -func decodeIndexKey(key []byte) (indexID int64, indexValue []types.Datum, err error) { - if len(key) < prefixIndexLen || !bytes.HasPrefix(key, indexPrefix) { - return 0, nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key) - } - key = key[indexPrefixLen:] - key, indexID, err = codec.DecodeInt(key) - if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - indexValue, err = codec.Decode(key, 2) - if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - return -} - func decodeMetaKey(ek []byte) (meta, error) { if !bytes.HasPrefix(ek, metaPrefix) { return nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(ek) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 8ba3f958c87..5048089e4f5 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -16,7 +16,6 @@ package entry import ( "bytes" "context" - "encoding/binary" "encoding/json" "fmt" "math" @@ -64,66 +63,6 @@ type rowKVEntry struct { PreRowExist bool } -type indexKVEntry struct { - baseKVEntry - IndexID int64 - IndexValue []types.Datum -} - -func (idx *indexKVEntry) unflatten(tableInfo *model.TableInfo, tz *time.Location) error { - if tableInfo.ID != idx.PhysicalTableID { - isPartition := false - if pi := tableInfo.GetPartitionInfo(); pi != nil { - for _, p := range pi.Definitions { - if p.ID == idx.PhysicalTableID { - isPartition = true - break - } - } - } - if !isPartition { - return cerror.ErrWrongTableInfo.GenWithStackByArgs(tableInfo.ID, idx.PhysicalTableID) - } - } - index, exist := tableInfo.GetIndexInfo(idx.IndexID) - if !exist { - return cerror.ErrIndexKeyTableNotFound.GenWithStackByArgs(idx.IndexID) - } - if !isDistinct(index, idx.IndexValue) { - idx.RecordID = idx.baseKVEntry.RecordID - if idx.baseKVEntry.RecordID.IsInt() { - idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-1] - } else { - idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-idx.RecordID.NumCols()] - } - } - for i, v := range idx.IndexValue { - colOffset := index.Columns[i].Offset - fieldType := &tableInfo.Columns[colOffset].FieldType - datum, err := unflatten(v, fieldType, tz) - if err != nil { - return errors.Trace(err) - } - idx.IndexValue[i] = datum - } - return nil -} - -func isDistinct(index *timodel.IndexInfo, indexValue []types.Datum) bool { - if index.Primary { - return true - } - if index.Unique { - for _, value := range indexValue { - if value.IsNull() { - return false - } - } - return true - } - return false -} - // Mounter is used to parse SQL events from KV events type Mounter interface { Run(ctx context.Context) error @@ -258,8 +197,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode } return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID) } - switch { - case bytes.HasPrefix(key, recordPrefix): + if bytes.HasPrefix(key, recordPrefix) { rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo) if err != nil { return nil, errors.Trace(err) @@ -268,15 +206,6 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode return nil, nil } return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateSize()) - case bytes.HasPrefix(key, indexPrefix): - indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo) - if err != nil { - return nil, errors.Trace(err) - } - if indexKV == nil { - return nil, nil - } - return m.mountIndexKVEntry(tableInfo, indexKV, raw.ApproximateSize()) } return nil, nil }() @@ -331,46 +260,6 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []b }, nil } -func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) { - // Skip set index KV. - // By default we cannot get the old value of a deleted row, then we must get the value of unique key - // or primary key for seeking the deleted row through its index key. - // After the old value was enabled, we can skip the index key. - if !base.Delete || m.enableOldValue { - return nil, nil - } - - indexID, indexValue, err := decodeIndexKey(restKey) - if err != nil { - return nil, errors.Trace(err) - } - var handle kv.Handle - - if len(rawValue) == 8 { - // primary key or unique index - var recordID int64 - buf := bytes.NewBuffer(rawValue) - err = binary.Read(buf, binary.BigEndian, &recordID) - if err != nil { - return nil, errors.Trace(err) - } - handle = kv.IntHandle(recordID) - } else if len(rawValue) > 0 && rawValue[0] == tablecodec.CommonHandleFlag { - handleLen := uint16(rawValue[1])<<8 + uint16(rawValue[2]) - handleEndOff := 3 + handleLen - handle, err = kv.NewCommonHandle(rawValue[3:handleEndOff]) - if err != nil { - return nil, errors.Trace(err) - } - } - base.RecordID = handle - return &indexKVEntry{ - baseKVEntry: base, - IndexID: indexID, - IndexValue: indexValue, - }, nil -} - const ( ddlJobListKey = "DDLJobList" ddlAddIndexJobListKey = "DDLJobAddIdxList" @@ -511,68 +400,6 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr }, nil } -func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry, dataSize int64) (*model.RowChangedEvent, error) { - // skip set index KV - if !idx.Delete || m.enableOldValue { - return nil, nil - } - // skip any index that is not the handle - if idx.IndexID != tableInfo.HandleIndexID { - return nil, nil - } - - indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID) - if !exist { - log.Warn("index info not found", zap.Int64("indexID", idx.IndexID)) - return nil, nil - } - - if !tableInfo.IsIndexUnique(indexInfo) { - return nil, nil - } - - err := idx.unflatten(tableInfo, m.tz) - if err != nil { - return nil, errors.Trace(err) - } - - preCols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) - for i, idxCol := range indexInfo.Columns { - colInfo := tableInfo.Columns[idxCol.Offset] - value, warn, err := formatColVal(idx.IndexValue[i], colInfo.Tp) - if err != nil { - return nil, errors.Trace(err) - } - if warn != "" { - log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) - } - preCols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ - Name: colInfo.Name.O, - Type: colInfo.Tp, - Value: value, - Flag: tableInfo.ColumnsFlag[colInfo.ID], - } - } - var intRowID int64 - if idx.RecordID != nil && idx.RecordID.IsInt() { - intRowID = idx.RecordID.IntValue() - } - return &model.RowChangedEvent{ - StartTs: idx.StartTs, - CommitTs: idx.CRTs, - RowID: intRowID, - Table: &model.TableName{ - Schema: tableInfo.TableName.Schema, - Table: tableInfo.TableName.Table, - TableID: idx.PhysicalTableID, - IsPartition: tableInfo.GetPartitionInfo() != nil, - }, - PreColumns: preCols, - IndexColumns: tableInfo.IndexColumnsOffset, - ApproximateSize: dataSize, - }, nil -} - var emptyBytes = make([]byte, 0) func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) { From 905d1eaa20c22ff18f341d93db34205b02dcb1e8 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 13 Jul 2021 22:45:56 +0800 Subject: [PATCH 08/21] mounter: fix test --- cdc/entry/mounter.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 5048089e4f5..ae7059c9649 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -331,15 +331,6 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill } func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, error) { - // if m.enableOldValue == true, go into this function - // if m.enableNewValue == false and row.Delete == false, go into this function - // if m.enableNewValue == false and row.Delete == true and use explict row id, go into this function - // only if m.enableNewValue == false and row.Delete == true and use implicit row id(_tidb_rowid), skip this function - useImplicitTiDBRowID := !tableInfo.PKIsHandle && !tableInfo.IsCommonHandle - if !m.enableOldValue && row.Delete && useImplicitTiDBRowID { - return nil, nil - } - var err error // Decode previous columns. var preCols []*model.Column From 51671942a87770eb809f6f880d1c875ccb0d994d Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 14 Jul 2021 10:09:38 +0800 Subject: [PATCH 09/21] refactor(mounter): refine GetTableSpan --- cdc/entry/mounter_test.go | 2 +- cdc/processor.go | 2 +- cdc/processor/pipeline/puller.go | 4 ++-- pkg/regionspan/span.go | 13 ++++--------- pkg/regionspan/span_test.go | 12 ++---------- 5 files changed, 10 insertions(+), 23 deletions(-) diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 4eb16eb9576..523bf8d1015 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -385,7 +385,7 @@ func walkTableSpanInStore(c *check.C, store tidbkv.Storage, tableID int64, f fun txn, err := store.Begin() c.Assert(err, check.IsNil) defer txn.Rollback() //nolint:errcheck - tableSpan := regionspan.GetTableSpan(tableID, false) + tableSpan := regionspan.GetTableSpan(tableID) kvIter, err := txn.Iter(tableSpan.Start, tableSpan.End) c.Assert(err, check.IsNil) defer kvIter.Close() diff --git a/cdc/processor.go b/cdc/processor.go index 317791f0372..96a83238b2d 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -811,7 +811,7 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo startPuller := func(tableID model.TableID, pResolvedTs *uint64, pCheckpointTs *uint64) sink.Sink { // start table puller - span := regionspan.GetTableSpan(tableID, true) + span := regionspan.GetTableSpan(tableID) kvStorage, err := util.KVStorageFromCtx(ctx) if err != nil { p.sendError(err) diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index ac97b5c5353..a5e535ce603 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -54,10 +54,10 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span { // start table puller config := ctx.ChangefeedVars().Info.Config spans := make([]regionspan.Span, 0, 4) - spans = append(spans, regionspan.GetTableSpan(n.tableID, true)) + spans = append(spans, regionspan.GetTableSpan(n.tableID)) if config.Cyclic.IsEnabled() && n.replicaInfo.MarkTableID != 0 { - spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID, true)) + spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID)) } return spans } diff --git a/pkg/regionspan/span.go b/pkg/regionspan/span.go index 7d06c482117..79fcb5eef7e 100644 --- a/pkg/regionspan/span.go +++ b/pkg/regionspan/span.go @@ -83,19 +83,14 @@ func hackSpan(originStart []byte, originEnd []byte) (start []byte, end []byte) { } // GetTableSpan returns the span to watch for the specified table -func GetTableSpan(tableID int64, exceptIndexSpan bool) Span { +func GetTableSpan(tableID int64) Span { sep := byte('_') recordMarker := byte('r') tablePrefix := tablecodec.GenTablePrefix(tableID) var start, end kv.Key - // ignore index keys if we don't need them - if exceptIndexSpan { - start = append(tablePrefix, sep, recordMarker) - end = append(tablePrefix, sep, recordMarker+1) - } else { - start = append(tablePrefix, sep) - end = append(tablePrefix, sep+1) - } + // ignore index keys. + start = append(tablePrefix, sep, recordMarker) + end = append(tablePrefix, sep, recordMarker+1) return Span{ Start: start, End: end, diff --git a/pkg/regionspan/span_test.go b/pkg/regionspan/span_test.go index 1266acc89f9..2d2489a59ea 100644 --- a/pkg/regionspan/span_test.go +++ b/pkg/regionspan/span_test.go @@ -104,17 +104,9 @@ func (s *spanSuite) TestIntersect(c *check.C) { } func (s *spanSuite) TestGetTableSpan(c *check.C) { - defer testleak.AfterTest(c)() - span := GetTableSpan(123, false) - c.Assert(span.Start, check.Less, span.End) - prefix := []byte(tablecodec.GenTablePrefix(123)) - c.Assert(span.Start, check.Greater, prefix) - prefix[len(prefix)-1]++ - c.Assert(span.End, check.Less, prefix) - - span = GetTableSpan(123, true) + span := GetTableSpan(123) c.Assert(span.Start, check.Less, span.End) - prefix = []byte(tablecodec.GenTableRecordPrefix(123)) + prefix := []byte(tablecodec.GenTableRecordPrefix(123)) c.Assert(span.Start, check.GreaterEqual, prefix) prefix[len(prefix)-1]++ c.Assert(span.End, check.LessEqual, prefix) From f93410310ac37b189ea6b958cf2740d1fd934c03 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 14 Jul 2021 10:14:30 +0800 Subject: [PATCH 10/21] refactor(span): fix test --- pkg/regionspan/span_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/regionspan/span_test.go b/pkg/regionspan/span_test.go index 2d2489a59ea..46940795816 100644 --- a/pkg/regionspan/span_test.go +++ b/pkg/regionspan/span_test.go @@ -104,6 +104,7 @@ func (s *spanSuite) TestIntersect(c *check.C) { } func (s *spanSuite) TestGetTableSpan(c *check.C) { + defer testleak.AfterTest(c)() span := GetTableSpan(123) c.Assert(span.Start, check.Less, span.End) prefix := []byte(tablecodec.GenTableRecordPrefix(123)) From fbe4a1d0698c62ebda21e79d178ed1e550c3d5e4 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 14 Jul 2021 23:54:43 +0800 Subject: [PATCH 11/21] mounter: udpate fix --- cdc/entry/mounter.go | 12 ++++++-- cdc/model/mounter.go | 13 +++++++++ cdc/processor/pipeline/sink.go | 52 +++++++++++++++++++++++++++++++++- 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index ae7059c9649..0884c20c39e 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -339,7 +339,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr // Normally old value is output when only enableOldValue is on, // but for the Delete event, when the old value feature is off, // the HandleKey column needs to be included as well. So we need to do the following filtering. - if (row.PreRowExist && m.enableOldValue) || (row.PreRowExist && row.Delete) { + if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info preCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) @@ -373,11 +373,19 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr if row.RecordID.IsInt() { intRowID = row.RecordID.IntValue() } + + var tableInfoVersion uint64 + if row.Delete { + tableInfoVersion = 0 + } else { + tableInfoVersion = tableInfo.TableInfoVersion + } + return &model.RowChangedEvent{ StartTs: row.StartTs, CommitTs: row.CRTs, RowID: intRowID, - TableInfoVersion: tableInfo.TableInfoVersion, + TableInfoVersion: tableInfoVersion, Table: &model.TableName{ Schema: schemaName, Table: tableName, diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 8339ca6b11e..54ae5ed4f97 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -83,3 +83,16 @@ func (e *PolymorphicEvent) WaitPrepare(ctx context.Context) error { } return nil } + +// Clone returns a deep-clone of the struct. +func (e *PolymorphicEvent) Clone() *PolymorphicEvent { + clone := *e + + row := *e.Row + clone.Row = &row + + rowKV := *e.RawKV + clone.RawKV = &rowKV + + return &clone +} diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index fa492118e20..c2189a2d097 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -142,7 +142,57 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err } func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error { - n.eventBuffer = append(n.eventBuffer, event) + colLen := len(event.Row.Columns) + preColLen := len(event.Row.PreColumns) + + // This indicates that it is an update event, + // and after turning on old value internally by default, + // we need to handle the update event to be compatible with the old format. + if colLen != 0 && preColLen != 0 && colLen == preColLen { + config := ctx.ChangefeedVars().Info.Config + if config.EnableOldValue { + // TODO: Need to test if the behavior is correct. + n.eventBuffer = append(n.eventBuffer, event) + } else { + handleKeyCount := 0 + equivalentHandleKeyCount := 0 + for i := range event.Row.Columns { + if event.Row.Columns[i].Flag.IsHandleKey() && event.Row.PreColumns[i].Flag.IsHandleKey() { + handleKeyCount++ + colValueString := model.ColumnValueString(event.Row.Columns[i].Value) + preColValueString := model.ColumnValueString(event.Row.PreColumns[i].Value) + if colValueString == preColValueString { + equivalentHandleKeyCount++ + } + } + } + // If the handle key columns are not updated, PreColumns is directly ignored. + if handleKeyCount == equivalentHandleKeyCount { + event.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, event) + } else { + // If there is an update to handle key columns, + // we need to split the event into two events to be compatible with the old format. + deleteEvent := event.Clone() + deleteEvent.Row.Columns = nil + for i := range deleteEvent.Row.PreColumns { + // Only the handle key column is retained in the delete event. + if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { + deleteEvent.Row.PreColumns[i] = nil + } + } + deleteEvent.Row.TableInfoVersion = 0 + n.eventBuffer = append(n.eventBuffer, deleteEvent) + + replaceEvent := event.Clone() + replaceEvent.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, replaceEvent) + } + } + } else { + n.eventBuffer = append(n.eventBuffer, event) + } + if len(n.eventBuffer) >= defaultSyncResolvedBatch { if err := n.flushRow2Sink(ctx); err != nil { return errors.Trace(err) From 5726a3a29850eb6a761439ce485666f6bd298f2a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 00:12:10 +0800 Subject: [PATCH 12/21] mounter: udpate fix --- cdc/entry/mounter.go | 3 +- cdc/processor/pipeline/sink.go | 65 ++++++++++++++++------------------ 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 0884c20c39e..328a5c83fb7 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -375,7 +375,8 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr } var tableInfoVersion uint64 - if row.Delete { + // + if row.Delete && !m.enableOldValue { tableInfoVersion = 0 } else { tableInfoVersion = tableInfo.TableInfoVersion diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index c2189a2d097..8e44747b97a 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -144,50 +144,45 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error { colLen := len(event.Row.Columns) preColLen := len(event.Row.PreColumns) + config := ctx.ChangefeedVars().Info.Config // This indicates that it is an update event, // and after turning on old value internally by default, // we need to handle the update event to be compatible with the old format. - if colLen != 0 && preColLen != 0 && colLen == preColLen { - config := ctx.ChangefeedVars().Info.Config - if config.EnableOldValue { - // TODO: Need to test if the behavior is correct. + if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen { + handleKeyCount := 0 + equivalentHandleKeyCount := 0 + for i := range event.Row.Columns { + if event.Row.Columns[i].Flag.IsHandleKey() && event.Row.PreColumns[i].Flag.IsHandleKey() { + handleKeyCount++ + colValueString := model.ColumnValueString(event.Row.Columns[i].Value) + preColValueString := model.ColumnValueString(event.Row.PreColumns[i].Value) + if colValueString == preColValueString { + equivalentHandleKeyCount++ + } + } + } + // If the handle key columns are not updated, PreColumns is directly ignored. + if handleKeyCount == equivalentHandleKeyCount { + event.Row.PreColumns = nil n.eventBuffer = append(n.eventBuffer, event) } else { - handleKeyCount := 0 - equivalentHandleKeyCount := 0 - for i := range event.Row.Columns { - if event.Row.Columns[i].Flag.IsHandleKey() && event.Row.PreColumns[i].Flag.IsHandleKey() { - handleKeyCount++ - colValueString := model.ColumnValueString(event.Row.Columns[i].Value) - preColValueString := model.ColumnValueString(event.Row.PreColumns[i].Value) - if colValueString == preColValueString { - equivalentHandleKeyCount++ - } + // If there is an update to handle key columns, + // we need to split the event into two events to be compatible with the old format. + deleteEvent := event.Clone() + deleteEvent.Row.Columns = nil + for i := range deleteEvent.Row.PreColumns { + // Only the handle key column is retained in the delete event. + if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { + deleteEvent.Row.PreColumns[i] = nil } } - // If the handle key columns are not updated, PreColumns is directly ignored. - if handleKeyCount == equivalentHandleKeyCount { - event.Row.PreColumns = nil - n.eventBuffer = append(n.eventBuffer, event) - } else { - // If there is an update to handle key columns, - // we need to split the event into two events to be compatible with the old format. - deleteEvent := event.Clone() - deleteEvent.Row.Columns = nil - for i := range deleteEvent.Row.PreColumns { - // Only the handle key column is retained in the delete event. - if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { - deleteEvent.Row.PreColumns[i] = nil - } - } - deleteEvent.Row.TableInfoVersion = 0 - n.eventBuffer = append(n.eventBuffer, deleteEvent) + deleteEvent.Row.TableInfoVersion = 0 + n.eventBuffer = append(n.eventBuffer, deleteEvent) - replaceEvent := event.Clone() - replaceEvent.Row.PreColumns = nil - n.eventBuffer = append(n.eventBuffer, replaceEvent) - } + replaceEvent := event.Clone() + replaceEvent.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, replaceEvent) } } else { n.eventBuffer = append(n.eventBuffer, event) From f181b08b2d069cf5e206fccb7af131100222600d Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 00:20:43 +0800 Subject: [PATCH 13/21] mounter: better comments --- cdc/entry/mounter.go | 2 +- cdc/processor/pipeline/sink.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 328a5c83fb7..c65bf6e3fad 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -375,7 +375,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr } var tableInfoVersion uint64 - // + // Align with the old format if old value disabled. if row.Delete && !m.enableOldValue { tableInfoVersion = 0 } else { diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 8e44747b97a..4b177ef28a4 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -172,15 +172,17 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE deleteEvent := event.Clone() deleteEvent.Row.Columns = nil for i := range deleteEvent.Row.PreColumns { - // Only the handle key column is retained in the delete event. + // NOTICE: Only the handle key column is retained in the delete event. if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { deleteEvent.Row.PreColumns[i] = nil } } + // Align with the old format if old value disabled. deleteEvent.Row.TableInfoVersion = 0 n.eventBuffer = append(n.eventBuffer, deleteEvent) replaceEvent := event.Clone() + // NOTICE: clean up pre cols for replace event. replaceEvent.Row.PreColumns = nil n.eventBuffer = append(n.eventBuffer, replaceEvent) } From e9129888351929f98d817799c8df06ac0d6034c0 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 00:29:19 +0800 Subject: [PATCH 14/21] mounter: better comment --- cdc/processor/pipeline/sink.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 4b177ef28a4..77d7b42e06c 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -147,8 +147,8 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE config := ctx.ChangefeedVars().Info.Config // This indicates that it is an update event, - // and after turning on old value internally by default, - // we need to handle the update event to be compatible with the old format. + // and after enable old value internally by default(but disable in the configuration). + // We need to handle the update event to be compatible with the old format. if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen { handleKeyCount := 0 equivalentHandleKeyCount := 0 @@ -162,6 +162,7 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE } } } + // If the handle key columns are not updated, PreColumns is directly ignored. if handleKeyCount == equivalentHandleKeyCount { event.Row.PreColumns = nil @@ -172,7 +173,7 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE deleteEvent := event.Clone() deleteEvent.Row.Columns = nil for i := range deleteEvent.Row.PreColumns { - // NOTICE: Only the handle key column is retained in the delete event. + // NOTICE: Only the handle key pre column is retained in the delete event. if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { deleteEvent.Row.PreColumns[i] = nil } From 1fec157e224d28cfb27174cf3f3e99115ebfa3aa Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 00:56:28 +0800 Subject: [PATCH 15/21] sink_test: make tests happy --- cdc/processor/pipeline/sink_test.go | 35 +++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 910ac91a3c0..ed5abc92897 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -15,7 +15,10 @@ package pipeline import ( "context" + "github.com/pingcap/ticdc/pkg/config" + "github.com/tikv/client-go/v2/oracle" "testing" + "time" "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" @@ -106,6 +109,13 @@ var _ = check.Suite(&outputSuite{}) func (s *outputSuite) TestStatus(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-status", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) // test stop at targetTs node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) @@ -116,19 +126,19 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) err := node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)) c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue) c.Assert(node.Status(), check.Equals, TableStatusStopped) c.Assert(node.CheckpointTs(), check.Equals, uint64(10)) @@ -142,7 +152,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -150,7 +160,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusRunning) err = node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)) c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue) c.Assert(node.Status(), check.Equals, TableStatusStopped) c.Assert(node.CheckpointTs(), check.Equals, uint64(6)) @@ -164,7 +174,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -172,7 +182,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusRunning) err = node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)) c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue) c.Assert(node.Status(), check.Equals, TableStatusStopped) c.Assert(node.CheckpointTs(), check.Equals, uint64(7)) @@ -181,6 +191,13 @@ func (s *outputSuite) TestStatus(c *check.C) { func (s *outputSuite) TestManyTs(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-many-ts", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) sink := &mockSink{} node := newSinkNode(sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) @@ -195,7 +212,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) sink.Check(c, nil) From 6ad1510fee531380bcaeea04cb8a80e9b23ed27b Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 09:17:31 +0800 Subject: [PATCH 16/21] sink_test: make tests happy --- cdc/processor/pipeline/sink_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index ed5abc92897..5d7abdecb8f 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -15,17 +15,17 @@ package pipeline import ( "context" - "github.com/pingcap/ticdc/pkg/config" - "github.com/tikv/client-go/v2/oracle" "testing" "time" "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cdcContext "github.com/pingcap/ticdc/pkg/context" cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/tikv/client-go/v2/oracle" ) func TestSuite(t *testing.T) { From 7cf3df86fdb683a0ecbad02a57b8d1081a5eb414 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 10:09:52 +0800 Subject: [PATCH 17/21] sink: better code --- cdc/model/mounter.go | 13 ------------- cdc/processor/pipeline/sink.go | 20 ++++++++++++++++---- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 54ae5ed4f97..8339ca6b11e 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -83,16 +83,3 @@ func (e *PolymorphicEvent) WaitPrepare(ctx context.Context) error { } return nil } - -// Clone returns a deep-clone of the struct. -func (e *PolymorphicEvent) Clone() *PolymorphicEvent { - clone := *e - - row := *e.Row - clone.Row = &row - - rowKV := *e.RawKV - clone.RawKV = &rowKV - - return &clone -} diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 77d7b42e06c..7f541206c26 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -170,7 +170,14 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE } else { // If there is an update to handle key columns, // we need to split the event into two events to be compatible with the old format. - deleteEvent := event.Clone() + // NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively, + // so it won't have an impact and no more full deep copy wastes memory. + deleteEvent := *event + deleteEventRow := *event.Row + deleteEventRowKV := *event.RawKV + deleteEvent.Row = &deleteEventRow + deleteEvent.RawKV = &deleteEventRowKV + deleteEvent.Row.Columns = nil for i := range deleteEvent.Row.PreColumns { // NOTICE: Only the handle key pre column is retained in the delete event. @@ -180,12 +187,17 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE } // Align with the old format if old value disabled. deleteEvent.Row.TableInfoVersion = 0 - n.eventBuffer = append(n.eventBuffer, deleteEvent) + n.eventBuffer = append(n.eventBuffer, &deleteEvent) + + replaceEvent := *event + replaceEventRow := *event.Row + replaceEventRowKV := *event.RawKV + replaceEvent.Row = &replaceEventRow + replaceEvent.RawKV = &replaceEventRowKV - replaceEvent := event.Clone() // NOTICE: clean up pre cols for replace event. replaceEvent.Row.PreColumns = nil - n.eventBuffer = append(n.eventBuffer, replaceEvent) + n.eventBuffer = append(n.eventBuffer, &replaceEvent) } } else { n.eventBuffer = append(n.eventBuffer, event) From 1f0be3101afc8ffd71c02bc6434ef52f1439ffa1 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 10:18:32 +0800 Subject: [PATCH 18/21] sink: better code --- cdc/processor/pipeline/sink.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 7f541206c26..73b701607a0 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -142,6 +142,10 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err } func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error { + if event == nil || event.Row == nil { + return nil + } + colLen := len(event.Row.Columns) preColLen := len(event.Row.PreColumns) config := ctx.ChangefeedVars().Info.Config From 047bc6ddc2b3836d14f933676e5a3929649b9309 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 11:50:27 +0800 Subject: [PATCH 19/21] chore: add todo --- cdc/model/schema_storage.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 0bd8aaaecfd..2238dc3b331 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -143,6 +143,8 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode return ti } +// TODO(hi-rustin): After we don't need to subscribe index update, +// findHandleIndex may be not necessary any more. func (ti *TableInfo) findHandleIndex() { if ti.HandleIndexID == HandleIndexPKIsHandle { // pk is handle From 7c92dbcda25162ec2303d23fe764f4092a157306 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 13:44:43 +0800 Subject: [PATCH 20/21] test: add tests for sink --- cdc/processor/pipeline/sink.go | 16 +-- cdc/processor/pipeline/sink_test.go | 172 ++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 8 deletions(-) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 73b701607a0..911b760e0fc 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -193,15 +193,15 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE deleteEvent.Row.TableInfoVersion = 0 n.eventBuffer = append(n.eventBuffer, &deleteEvent) - replaceEvent := *event - replaceEventRow := *event.Row - replaceEventRowKV := *event.RawKV - replaceEvent.Row = &replaceEventRow - replaceEvent.RawKV = &replaceEventRowKV + insertEvent := *event + insertEventRow := *event.Row + insertEventRowKV := *event.RawKV + insertEvent.Row = &insertEventRow + insertEvent.RawKV = &insertEventRowKV - // NOTICE: clean up pre cols for replace event. - replaceEvent.Row.PreColumns = nil - n.eventBuffer = append(n.eventBuffer, &replaceEvent) + // NOTICE: clean up pre cols for insert event. + insertEvent.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, &insertEvent) } } else { n.eventBuffer = append(n.eventBuffer, event) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 5d7abdecb8f..d358cf15993 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -244,3 +244,175 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.ResolvedTs(), check.Equals, uint64(2)) c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) } + +func (s *outputSuite) TestSplitUpdateEventWithEnableOldValue(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-split-update-event", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) + sink := &mockSink{} + node := newSinkNode(sink, 0, 10, &mockFlowController{}) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + + // nil row. + c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + c.Assert(node.eventBuffer, check.HasLen, 0) + + columns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + preColumns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + c.Assert(node.Receive(pipeline.MockNodeContext4Test( + ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 1, Columns: columns, PreColumns: preColumns}, + }), nil)), + check.IsNil, + ) + c.Assert(node.eventBuffer, check.HasLen, 1) + c.Assert(node.eventBuffer[0].Row.Columns, check.HasLen, 2) + c.Assert(node.eventBuffer[0].Row.PreColumns, check.HasLen, 2) +} + +func (s *outputSuite) TestSplitUpdateEventWithDisableOldValue(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + cfg := config.GetDefaultReplicaConfig() + cfg.EnableOldValue = false + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-split-update-event", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: cfg, + }, + }) + sink := &mockSink{} + node := newSinkNode(sink, 0, 10, &mockFlowController{}) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + + // nil row. + c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + c.Assert(node.eventBuffer, check.HasLen, 0) + + // No update to the handle key column. + columns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + preColumns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + + c.Assert(node.Receive(pipeline.MockNodeContext4Test( + ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 1, Columns: columns, PreColumns: preColumns}, + }), nil)), + check.IsNil, + ) + c.Assert(node.eventBuffer, check.HasLen, 1) + c.Assert(node.eventBuffer[0].Row.Columns, check.HasLen, 2) + c.Assert(node.eventBuffer[0].Row.PreColumns, check.HasLen, 0) + + // Cleanup. + node.eventBuffer = []*model.PolymorphicEvent{} + // Update to the handle key column. + columns = []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value-updated", + }, + } + preColumns = []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + + c.Assert(node.Receive(pipeline.MockNodeContext4Test( + ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 1, Columns: columns, PreColumns: preColumns}, + }), nil)), + check.IsNil, + ) + // Split an update event into a delete and an insert event. + c.Assert(node.eventBuffer, check.HasLen, 2) + + deleteEventIndex := 0 + c.Assert(node.eventBuffer[deleteEventIndex].Row.Columns, check.HasLen, 0) + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns, check.HasLen, 2) + nonHandleKeyColIndex := 0 + handleKeyColIndex := 1 + // NOTICE: When old value disabled, we only keep the handle key pre cols. + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[nonHandleKeyColIndex], check.IsNil) + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Name, check.Equals, "col2") + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Flag.IsHandleKey(), check.IsTrue) + + insertEventIndex := 1 + c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2) + c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0) +} From 460438bd96d035f70073bbe0550d42e991faafb0 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 15 Jul 2021 14:14:42 +0800 Subject: [PATCH 21/21] test: rename --- cdc/processor/pipeline/sink_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index d358cf15993..35387c499f2 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -245,7 +245,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) } -func (s *outputSuite) TestSplitUpdateEventWithEnableOldValue(c *check.C) { +func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ @@ -302,7 +302,7 @@ func (s *outputSuite) TestSplitUpdateEventWithEnableOldValue(c *check.C) { c.Assert(node.eventBuffer[0].Row.PreColumns, check.HasLen, 2) } -func (s *outputSuite) TestSplitUpdateEventWithDisableOldValue(c *check.C) { +func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) cfg := config.GetDefaultReplicaConfig()