diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 020969333ff..b7315c6af3f 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -336,29 +336,33 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { func datum2Column( tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool, -) ([]*model.Column, []types.Datum, []int64, error) { +) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) - columnIDs := make([]int64, len(tableInfo.RowColumnsOffset)) + columnInfos := make([]*timodel.ColumnInfo, len(tableInfo.RowColumnsOffset)) for _, colInfo := range tableInfo.Columns { - colSize := 0 if !model.IsColCDCVisible(colInfo) { log.Debug("skip the column which is not visible", zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O)) continue } + colName := colInfo.Name.O - colDatums, exist := datums[colInfo.ID] - var colValue interface{} + colID := colInfo.ID + colDatums, exist := datums[colID] if !exist && !fillWithDefaultValue { log.Debug("column value is not found", zap.String("table", tableInfo.Name.O), zap.String("column", colName)) continue } - var err error - var warn string - var size int + + var ( + colValue interface{} + size int + warn string + err error + ) if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) } else if fillWithDefaultValue { @@ -368,31 +372,33 @@ func datum2Column( return nil, nil, nil, errors.Trace(err) } if warn != "" { - log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) + log.Warn(warn, zap.String("table", tableInfo.TableName.String()), + zap.String("column", colInfo.Name.String())) } + defaultValue := getDDLDefaultDefinition(colInfo) - colSize += size - rawCols[tableInfo.RowColumnsOffset[colInfo.ID]] = colDatums - cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ + offset := tableInfo.RowColumnsOffset[colID] + rawCols[offset] = colDatums + cols[offset] = &model.Column{ Name: colName, Type: colInfo.GetType(), Charset: colInfo.GetCharset(), Value: colValue, Default: defaultValue, - Flag: tableInfo.ColumnsFlag[colInfo.ID], + Flag: tableInfo.ColumnsFlag[colID], // ApproximateBytes = column data size + column struct size - ApproximateBytes: colSize + sizeOfEmptyColumn, + ApproximateBytes: size + sizeOfEmptyColumn, } - columnIDs[tableInfo.RowColumnsOffset[colInfo.ID]] = colInfo.ID + columnInfos[offset] = colInfo } - return cols, rawCols, columnIDs, nil + return cols, rawCols, columnInfos, nil } // return error if cannot get the expected checksum from the decoder // return false if the checksum is not matched // return true if the checksum is matched and the checksum is the matched one. func (m *mounter) verifyChecksum( - columnIDs []int64, rawColumns []types.Datum, isPreRow bool, + columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool, ) (uint32, bool, error) { if !m.integrity.Enabled() { return 0, true, nil @@ -420,7 +426,18 @@ func (m *mounter) verifyChecksum( return 0, true, nil } - checksum, err := m.encoder.Checksum(m.sctx, columnIDs, rawColumns) + columns := make([]rowcodec.ColData, 0, len(rawColumns)) + for idx, col := range columnInfos { + columns = append(columns, rowcodec.ColData{ + ColumnInfo: col, + Datum: &rawColumns[idx], + }) + } + calculator := rowcodec.RowData{ + Cols: columns, + Data: make([]byte, 0), + } + checksum, err := calculator.Checksum() if err != nil { log.Error("failed to calculate the checksum", zap.Error(err)) return 0, false, errors.Trace(err) @@ -457,12 +474,13 @@ func (m *mounter) verifyChecksum( func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) { var ( - rawRow model.RowChangedDatums - columnIDs []int64 - matched bool - err error + rawRow model.RowChangedDatums + columnInfos []*timodel.ColumnInfo + matched bool + err error - corrupted bool + corrupted bool + checksumVersion int ) // Decode previous columns. @@ -479,15 +497,16 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, preRawCols, columnIDs, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) + preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) if err != nil { return nil, rawRow, errors.Trace(err) } - preChecksum, matched, err = m.verifyChecksum(columnIDs, preRawCols, true) + preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true) if err != nil { return nil, rawRow, errors.Trace(err) } + checksumVersion = m.encoder.ChecksumVersion() if !matched { log.Error("previous columns checksum mismatch", @@ -519,19 +538,19 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d checksum uint32 ) if row.RowExist { - cols, rawCols, columnIDs, err = datum2Column(tableInfo, row.Row, true) + cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, true) if err != nil { return nil, rawRow, errors.Trace(err) } - checksum, matched, err = m.verifyChecksum(columnIDs, rawCols, false) + checksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false) if err != nil { return nil, rawRow, errors.Trace(err) } if !matched { log.Error("columns checksum mismatch", zap.Uint32("checksum", preChecksum), - zap.Int64s("columnIDs", columnIDs), + zap.Any("tableInfo", tableInfo), zap.Any("rawCols", rawCols)) if m.integrity.ErrorHandle() { return nil, rawRow, cerror.ErrCorruptedDataMutation. @@ -539,6 +558,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d } corrupted = true } + checksumVersion = m.encoder.ChecksumVersion() } schemaName := tableInfo.TableName.Schema @@ -566,9 +586,10 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d Columns: cols, PreColumns: preCols, - Checksum: checksum, - PreChecksum: preChecksum, - Corrupted: corrupted, + Checksum: checksum, + PreChecksum: preChecksum, + Corrupted: corrupted, + ChecksumVersion: checksumVersion, IndexColumns: tableInfo.IndexColumnsOffset, ApproximateDataSize: dataSize, diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index ef7b02c3797..be2ec3025b8 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/entry/schema" @@ -610,3 +611,34 @@ func TestProcessorLiveness(t *testing.T) { require.Nil(t, p.Close()) tester.MustApplyPatches() } + +func TestProcessorDostNotStuckInInit(t *testing.T) { + _ = failpoint. + Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkManagerRunError", + "1*return(true)") + defer func() { + _ = failpoint. + Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkManagerRunError") + }() + + ctx := cdcContext.NewBackendContext4Test(true) + liveness := model.LivenessCaptureAlive + p, tester := initProcessor4Test(ctx, t, &liveness) + + // First tick for creating position. + err := p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + // Second tick for init. + err = p.Tick(ctx) + require.Nil(t, err) + + // Third tick for handle error. + err = p.Tick(ctx) + require.NotNil(t, err) + require.Contains(t, err.Error(), "SinkManagerRunError") + + require.Nil(t, p.Close()) + tester.MustApplyPatches() +} diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 69941ea2765..8b6bfa4fd2a 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" @@ -182,7 +183,14 @@ func (m *SinkManager) Run(ctx context.Context) (err error) { m.changefeedInfo.SinkURI, m.changefeedInfo.Config, managerErrors) + failpoint.Inject("SinkManagerRunError", func() { + log.Info("failpoint SinkManagerRunError injected", + zap.String("changefeed", m.changefeedID.ID)) + err = errors.New("SinkManagerRunError") + }) + if err != nil { + close(m.ready) return errors.Trace(err) } diff --git a/go.mod b/go.mod index a41ed8bdbaf..0c9b60b4647 100644 --- a/go.mod +++ b/go.mod @@ -61,9 +61,9 @@ require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 - github.com/pingcap/tidb v1.1.0-beta.0.20230411230700-8b7b31005148 + github.com/pingcap/tidb v1.1.0-beta.0.20230418111328-47e7432054a1 github.com/pingcap/tidb-tools v6.5.1-0.20230208065359-62b90e1e24a7+incompatible - github.com/pingcap/tidb/parser v0.0.0-20230411032700-9949a54f29d8 + github.com/pingcap/tidb/parser v0.0.0-20230417161919-627110332165 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/r3labs/diff v1.1.0 @@ -79,7 +79,7 @@ require ( github.com/swaggo/gin-swagger v1.2.0 github.com/swaggo/swag v1.8.3 github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 - github.com/tikv/client-go/v2 v2.0.7-0.20230406064257-1ec0ff5bf443 + github.com/tikv/client-go/v2 v2.0.8-0.20230417065328-92db9f7b151f github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132 github.com/tikv/pd/client v0.0.0-20230329114254-1948c247c2b1 github.com/tinylib/msgp v1.1.6 diff --git a/go.sum b/go.sum index f05b20e94f1..b13a048f87c 100644 --- a/go.sum +++ b/go.sum @@ -908,15 +908,15 @@ github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39c github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= github.com/pingcap/tidb v1.1.0-beta.0.20220511160835-98c31070d958/go.mod h1:luW4sIZoLHY3bCWuKqyqk2QgMvF+/M7nWOXf/me0+fY= -github.com/pingcap/tidb v1.1.0-beta.0.20230411230700-8b7b31005148 h1:Ba5QFDwEgdVyG8qsll5p0aIzxYPu3KZjiGpjzxlZM00= -github.com/pingcap/tidb v1.1.0-beta.0.20230411230700-8b7b31005148/go.mod h1:coCCXjP3wKEvEHAFAvyYDftSMEt+2abglH8K7R41u/8= +github.com/pingcap/tidb v1.1.0-beta.0.20230418111328-47e7432054a1 h1:7yuiJQ2iRU4Qc+MPUrRx7lWhw/cekzu4A8triPWghiI= +github.com/pingcap/tidb v1.1.0-beta.0.20230418111328-47e7432054a1/go.mod h1:coCCXjP3wKEvEHAFAvyYDftSMEt+2abglH8K7R41u/8= github.com/pingcap/tidb-tools v6.5.1-0.20230208065359-62b90e1e24a7+incompatible h1:OT1Mrhe5UQInwiO+vGjbtd5Ej4r1ECjmeN4oaTdPlbE= github.com/pingcap/tidb-tools v6.5.1-0.20230208065359-62b90e1e24a7+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= github.com/pingcap/tidb/parser v0.0.0-20220511160835-98c31070d958/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= -github.com/pingcap/tidb/parser v0.0.0-20230411032700-9949a54f29d8 h1:Eddfkf6qtugOvVREIk+nek3bQl+MzpAPOAzrni0kth4= -github.com/pingcap/tidb/parser v0.0.0-20230411032700-9949a54f29d8/go.mod h1:R0xUtp5gJK/Xtb+PIvR3Wh/Ayvmorwk0nzT4p3HLZJk= +github.com/pingcap/tidb/parser v0.0.0-20230417161919-627110332165 h1:Rtym1QmDOvMaW0jHpOJLpiv9nh/5OhkFicds1oc5Mp8= +github.com/pingcap/tidb/parser v0.0.0-20230417161919-627110332165/go.mod h1:R0xUtp5gJK/Xtb+PIvR3Wh/Ayvmorwk0nzT4p3HLZJk= github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pingcap/tipb v0.0.0-20230310043643-5362260ee6f7 h1:CeeMOq1aHPAhXrw4eYXtQRyWOFlbfqK1+3f9Iop4IfU= github.com/pingcap/tipb v0.0.0-20230310043643-5362260ee6f7/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= @@ -1086,8 +1086,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tikv/client-go/v2 v2.0.1-0.20220510032238-ff5e35ac2869/go.mod h1:0scaG+seu7L56apm+Gjz9vckyO7ABIzM6T7n00mrIXs= -github.com/tikv/client-go/v2 v2.0.7-0.20230406064257-1ec0ff5bf443 h1:lqlizij6n/v4jx1Ph2rLF0E/gRJUg7kz3VmO6P5Y1e0= -github.com/tikv/client-go/v2 v2.0.7-0.20230406064257-1ec0ff5bf443/go.mod h1:9JNUWtHN8cx8eynHZ9xzdPi5YY6aiN1ILQyhfPUBcMo= +github.com/tikv/client-go/v2 v2.0.8-0.20230417065328-92db9f7b151f h1:pfDrSVAnfkk2EkrOc0iOmtA4n8F6TL9oEAK8R/enC50= +github.com/tikv/client-go/v2 v2.0.8-0.20230417065328-92db9f7b151f/go.mod h1:Dkqcv2dYoCOiNMiRgnEhpTa04dUaF9E3rbcz4rXxf3U= github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132 h1:vCVu7LxFou5WuaY6jHDMHKVeJTtwr5o2i1xWgGAdDo4= github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132/go.mod h1:jb9oq6rN4U0U3FZdvqWlpi9rZzFJxiOlvZ3aj5BTpg8= github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU= diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 7e636d689cd..f20d60b6583 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "encoding/json" "math/big" + "sort" "strconv" "strings" @@ -52,6 +53,24 @@ type BatchEncoder struct { bigintUnsignedHandlingMode string } +type avroEncodeInput struct { + columns []*model.Column + colInfos []rowcodec.ColInfo +} + +func (r *avroEncodeInput) Less(i, j int) bool { + return r.colInfos[i].ID < r.colInfos[j].ID +} + +func (r *avroEncodeInput) Len() int { + return len(r.columns) +} + +func (r *avroEncodeInput) Swap(i, j int) { + r.colInfos[i], r.colInfos[j] = r.colInfos[j], r.colInfos[i] + r.columns[i], r.columns[j] = r.columns[j], r.columns[i] +} + type avroEncodeResult struct { data []byte registryID int @@ -145,6 +164,8 @@ func (a *BatchEncoder) avroEncode( isKey bool, ) (*avroEncodeResult, error) { var ( + input *avroEncodeInput + cols []*model.Column colInfos []rowcodec.ColInfo enableTiDBExtension bool @@ -154,12 +175,19 @@ func (a *BatchEncoder) avroEncode( ) if isKey { cols, colInfos = e.HandleKeyColInfos() + input = &avroEncodeInput{ + columns: cols, + colInfos: colInfos, + } enableTiDBExtension = false enableRowLevelChecksum = false schemaManager = a.keySchemaManager } else { - cols = e.Columns - colInfos = e.ColInfos + input = &avroEncodeInput{ + columns: e.Columns, + colInfos: e.ColInfos, + } + enableTiDBExtension = a.enableTiDBExtension enableRowLevelChecksum = a.enableRowChecksum schemaManager = a.valueSchemaManager @@ -173,7 +201,7 @@ func (a *BatchEncoder) avroEncode( } } - if len(cols) == 0 { + if len(input.columns) == 0 { return nil, nil } @@ -183,8 +211,7 @@ func (a *BatchEncoder) avroEncode( schema, err := rowToAvroSchema( namespace, e.Table.Table, - cols, - colInfos, + input, enableTiDBExtension, enableRowLevelChecksum, a.decimalHandlingMode, @@ -208,8 +235,7 @@ func (a *BatchEncoder) avroEncode( } native, err := rowToAvroData( - cols, - colInfos, + input, e.CommitTs, operation, enableTiDBExtension, @@ -369,13 +395,16 @@ type avroLogicalTypeSchema struct { func rowToAvroSchema( namespace string, name string, - columnInfo []*model.Column, - colInfos []rowcodec.ColInfo, + input *avroEncodeInput, enableTiDBExtension bool, enableRowLevelChecksum bool, decimalHandlingMode string, bigintUnsignedHandlingMode string, ) (string, error) { + if enableRowLevelChecksum { + sort.Sort(input) + } + top := avroSchemaTop{ Tp: "record", Name: sanitizeName(name), @@ -383,10 +412,10 @@ func rowToAvroSchema( Fields: nil, } - for i, col := range columnInfo { + for i, col := range input.columns { avroType, err := columnToAvroSchema( col, - colInfos[i].Ft, + input.colInfos[i].Ft, decimalHandlingMode, bigintUnsignedHandlingMode, ) @@ -400,7 +429,7 @@ func rowToAvroSchema( copy.Value = copy.Default defaultValue, _, err := columnToAvroData( ©, - colInfos[i].Ft, + input.colInfos[i].Ft, decimalHandlingMode, bigintUnsignedHandlingMode, ) @@ -440,32 +469,38 @@ func rowToAvroSchema( if enableTiDBExtension { top.Fields = append(top.Fields, map[string]interface{}{ - "name": tidbOp, - "type": "string", + "name": tidbOp, + "type": "string", + "default": "", }, map[string]interface{}{ - "name": tidbCommitTs, - "type": "long", + "name": tidbCommitTs, + "type": "long", + "default": 0, }, map[string]interface{}{ - "name": tidbPhysicalTime, - "type": "long", + "name": tidbPhysicalTime, + "type": "long", + "default": 0, }, ) if enableRowLevelChecksum { top.Fields = append(top.Fields, map[string]interface{}{ - "name": tidbRowLevelChecksum, - "type": "string", + "name": tidbRowLevelChecksum, + "type": "string", + "default": "", }, map[string]interface{}{ - "name": tidbCorrupted, - "type": "boolean", + "name": tidbCorrupted, + "type": "boolean", + "default": false, }, map[string]interface{}{ - "name": tidbChecksumVersion, - "type": "int", + "name": tidbChecksumVersion, + "type": "int", + "default": 0, }) } @@ -475,27 +510,29 @@ func rowToAvroSchema( if err != nil { return "", cerror.WrapError(cerror.ErrAvroMarshalFailed, err) } - log.Debug("rowToAvroSchema", zap.ByteString("schema", str)) + log.Info("rowToAvroSchema", + zap.ByteString("schema", str), + zap.Bool("enableTiDBExtension", enableTiDBExtension), + zap.Bool("enableRowLevelChecksum", enableRowLevelChecksum)) return string(str), nil } func rowToAvroData( - cols []*model.Column, - colInfos []rowcodec.ColInfo, + input *avroEncodeInput, commitTs uint64, operation string, enableTiDBExtension bool, decimalHandlingMode string, bigintUnsignedHandlingMode string, ) (map[string]interface{}, error) { - ret := make(map[string]interface{}, len(cols)) - for i, col := range cols { + ret := make(map[string]interface{}, len(input.columns)) + for i, col := range input.columns { if col == nil { continue } data, str, err := columnToAvroData( col, - colInfos[i].Ft, + input.colInfos[i].Ft, decimalHandlingMode, bigintUnsignedHandlingMode, ) diff --git a/pkg/sink/codec/avro/avro_test.go b/pkg/sink/codec/avro/avro_test.go index 475f67f790b..3296fb818fd 100644 --- a/pkg/sink/codec/avro/avro_test.go +++ b/pkg/sink/codec/avro/avro_test.go @@ -18,7 +18,10 @@ import ( "context" "encoding/json" "math/big" + "math/rand" + "sort" "testing" + "time" "github.com/linkedin/goavro/v2" "github.com/pingcap/tidb/parser/mysql" @@ -629,6 +632,60 @@ func indentJSON(j string) string { return buf.String() } +func TestRowToAvroSchemaEnableChecksum(t *testing.T) { + t.Parallel() + + table := model.TableName{ + Schema: "testdb", + Table: "rowtoavroschema", + } + namespace := getAvroNamespace(model.DefaultNamespace, &table) + cols := make([]*model.Column, 0) + colInfos := make([]rowcodec.ColInfo, 0) + + for _, v := range avroTestColumns { + cols = append(cols, &v.col) + colInfos = append(colInfos, v.colInfo) + + colNew := v.col + colNew.Name = colNew.Name + "nullable" + colNew.Value = nil + colNew.Flag.SetIsNullable() + + colInfoNew := v.colInfo + colInfoNew.ID += int64(len(avroTestColumns)) + + cols = append(cols, &colNew) + colInfos = append(colInfos, colInfoNew) + } + + input := &avroEncodeInput{ + cols, + colInfos, + } + + rand.New(rand.NewSource(time.Now().Unix())).Shuffle(len(input.columns), func(i, j int) { + input.columns[i], input.columns[j] = input.columns[j], input.columns[i] + input.colInfos[i], input.colInfos[j] = input.colInfos[j], input.colInfos[i] + }) + + schema, err := rowToAvroSchema( + namespace, + table.Table, + input, + true, + true, + "string", + "string", + ) + require.NoError(t, err) + require.Equal(t, expectedSchemaWithExtensionEnableChecksum, indentJSON(schema)) + _, err = goavro.NewCodec(schema) + require.NoError(t, err) + + require.True(t, sort.IsSorted(input)) +} + func TestRowToAvroSchema(t *testing.T) { t.Parallel() @@ -654,8 +711,10 @@ func TestRowToAvroSchema(t *testing.T) { schema, err := rowToAvroSchema( namespace, table.Table, - cols, - colInfos, + &avroEncodeInput{ + cols, + colInfos, + }, false, false, "precise", @@ -669,8 +728,10 @@ func TestRowToAvroSchema(t *testing.T) { schema, err = rowToAvroSchema( namespace, table.Table, - cols, - colInfos, + &avroEncodeInput{ + cols, + colInfos, + }, true, false, "precise", @@ -699,7 +760,7 @@ func TestRowToAvroData(t *testing.T) { colInfos = append(colInfos, v.colInfo) } - data, err := rowToAvroData(cols, colInfos, 417318403368288260, "c", false, "precise", "long") + data, err := rowToAvroData(&avroEncodeInput{cols, colInfos}, 417318403368288260, "c", false, "precise", "long") require.NoError(t, err) _, exists := data["_tidb_commit_ts"] require.False(t, exists) @@ -708,7 +769,7 @@ func TestRowToAvroData(t *testing.T) { _, exists = data["_tidb_commit_physical_time"] require.False(t, exists) - data, err = rowToAvroData(cols, colInfos, 417318403368288260, "c", true, "precise", "long") + data, err = rowToAvroData(&avroEncodeInput{cols, colInfos}, 417318403368288260, "c", true, "precise", "long") require.NoError(t, err) v, exists := data["_tidb_commit_ts"] require.True(t, exists) @@ -784,8 +845,10 @@ func TestAvroEncode(t *testing.T) { keySchema, err := rowToAvroSchema( namespace, event.Table.Table, - keyCols, - keyColInfos, + &avroEncodeInput{ + keyCols, + keyColInfos, + }, false, false, "precise", @@ -809,8 +872,9 @@ func TestAvroEncode(t *testing.T) { valueSchema, err := rowToAvroSchema( namespace, event.Table.Table, - cols, - colInfos, + &avroEncodeInput{ + cols, colInfos, + }, true, false, "precise", diff --git a/pkg/sink/codec/avro/avro_test_data.go b/pkg/sink/codec/avro/avro_test_data.go index ca0fa5b2e6b..b84a2b14bb4 100644 --- a/pkg/sink/codec/avro/avro_test_data.go +++ b/pkg/sink/codec/avro/avro_test_data.go @@ -1635,16 +1635,855 @@ var expectedSchemaWithExtension = `{ ] }, { + "default": "", "name": "_tidb_op", "type": "string" }, { + "default": 0, "name": "_tidb_commit_ts", "type": "long" }, { + "default": 0, "name": "_tidb_commit_physical_time", "type": "long" } ] }` + +var expectedSchemaWithExtensionEnableChecksum = `{ + "type": "record", + "name": "rowtoavroschema", + "namespace": "default.testdb", + "fields": [ + { + "name": "tiny", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + }, + { + "name": "short", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + }, + { + "name": "int24", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + }, + { + "name": "long", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + }, + { + "name": "longlong", + "type": { + "type": "long", + "connect.parameters": { + "tidb_type": "BIGINT" + } + } + }, + { + "name": "tinyunsigned", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + }, + { + "name": "shortunsigned", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + }, + { + "name": "int24unsigned", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + }, + { + "name": "longunsigned", + "type": { + "type": "long", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + }, + { + "name": "longlongunsigned", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "BIGINT UNSIGNED" + } + } + }, + { + "name": "float", + "type": { + "type": "float", + "connect.parameters": { + "tidb_type": "FLOAT" + } + } + }, + { + "name": "double", + "type": { + "type": "double", + "connect.parameters": { + "tidb_type": "DOUBLE" + } + } + }, + { + "name": "bit", + "type": { + "type": "bytes", + "connect.parameters": { + "length": "1", + "tidb_type": "BIT" + } + } + }, + { + "name": "decimal", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "DECIMAL" + } + } + }, + { + "name": "tinytext", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "mediumtext", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "text", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "longtext", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "varchar", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "varstring", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "string", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + }, + { + "name": "tinyblob", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "mediumblob", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "blob", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "longblob", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "varbinary", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "varbinary1", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "binary", + "type": { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + }, + { + "name": "enum", + "type": { + "type": "string", + "connect.parameters": { + "allowed": "a\\,,b", + "tidb_type": "ENUM" + } + } + }, + { + "name": "set", + "type": { + "type": "string", + "connect.parameters": { + "allowed": "a\\,,b", + "tidb_type": "SET" + } + } + }, + { + "name": "json", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "JSON" + } + } + }, + { + "name": "date", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "DATE" + } + } + }, + { + "name": "datetime", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "DATETIME" + } + } + }, + { + "name": "timestamp", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TIMESTAMP" + } + } + }, + { + "name": "time", + "type": { + "type": "string", + "connect.parameters": { + "tidb_type": "TIME" + } + } + }, + { + "name": "year", + "type": { + "type": "int", + "connect.parameters": { + "tidb_type": "YEAR" + } + } + }, + { + "default": null, + "name": "tinynullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + ] + }, + { + "default": null, + "name": "shortnullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + ] + }, + { + "default": null, + "name": "int24nullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + ] + }, + { + "default": null, + "name": "longnullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT" + } + } + ] + }, + { + "default": null, + "name": "longlongnullable", + "type": [ + "null", + { + "type": "long", + "connect.parameters": { + "tidb_type": "BIGINT" + } + } + ] + }, + { + "default": null, + "name": "tinyunsignednullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + ] + }, + { + "default": null, + "name": "shortunsignednullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + ] + }, + { + "default": null, + "name": "int24unsignednullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + ] + }, + { + "default": null, + "name": "longunsignednullable", + "type": [ + "null", + { + "type": "long", + "connect.parameters": { + "tidb_type": "INT UNSIGNED" + } + } + ] + }, + { + "default": null, + "name": "longlongunsignednullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "BIGINT UNSIGNED" + } + } + ] + }, + { + "default": null, + "name": "floatnullable", + "type": [ + "null", + { + "type": "float", + "connect.parameters": { + "tidb_type": "FLOAT" + } + } + ] + }, + { + "default": null, + "name": "doublenullable", + "type": [ + "null", + { + "type": "double", + "connect.parameters": { + "tidb_type": "DOUBLE" + } + } + ] + }, + { + "default": null, + "name": "bitnullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "length": "1", + "tidb_type": "BIT" + } + } + ] + }, + { + "default": null, + "name": "decimalnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "DECIMAL" + } + } + ] + }, + { + "default": null, + "name": "tinytextnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "mediumtextnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "textnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "longtextnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "varcharnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "varstringnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "stringnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TEXT" + } + } + ] + }, + { + "default": null, + "name": "tinyblobnullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "mediumblobnullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "blobnullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "longblobnullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "varbinarynullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "varbinary1nullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "binarynullable", + "type": [ + "null", + { + "type": "bytes", + "connect.parameters": { + "tidb_type": "BLOB" + } + } + ] + }, + { + "default": null, + "name": "enumnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "allowed": "a\\,,b", + "tidb_type": "ENUM" + } + } + ] + }, + { + "default": null, + "name": "setnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "allowed": "a\\,,b", + "tidb_type": "SET" + } + } + ] + }, + { + "default": null, + "name": "jsonnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "JSON" + } + } + ] + }, + { + "default": null, + "name": "datenullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "DATE" + } + } + ] + }, + { + "default": null, + "name": "datetimenullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "DATETIME" + } + } + ] + }, + { + "default": null, + "name": "timestampnullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TIMESTAMP" + } + } + ] + }, + { + "default": null, + "name": "timenullable", + "type": [ + "null", + { + "type": "string", + "connect.parameters": { + "tidb_type": "TIME" + } + } + ] + }, + { + "default": null, + "name": "yearnullable", + "type": [ + "null", + { + "type": "int", + "connect.parameters": { + "tidb_type": "YEAR" + } + } + ] + }, + { + "default": "", + "name": "_tidb_op", + "type": "string" + }, + { + "default": 0, + "name": "_tidb_commit_ts", + "type": "long" + }, + { + "default": 0, + "name": "_tidb_commit_physical_time", + "type": "long" + }, + { + "default": "", + "name": "_tidb_row_level_checksum", + "type": "string" + }, + { + "default": false, + "name": "_tidb_corrupted", + "type": "boolean" + }, + { + "default": 0, + "name": "_tidb_checksum_version", + "type": "int" + } + ] +}` diff --git a/pkg/sqlmodel/causality.go b/pkg/sqlmodel/causality.go index d943ff949c6..fef2e4cf38e 100644 --- a/pkg/sqlmodel/causality.go +++ b/pkg/sqlmodel/causality.go @@ -144,6 +144,11 @@ func (r *RowChange) getCausalityString(values []interface{}) []string { ret := make([]string, 0, len(pkAndUks)) for _, indexCols := range pkAndUks { + // TODO: should not support multi value index and generate the value + // TODO: also fix https://github.com/pingcap/tiflow/issues/3286#issuecomment-971264282 + if indexCols.MVIndex { + continue + } cols, vals := getColsAndValuesOfIdx(r.sourceTableInfo.Columns, indexCols, values) // handle prefix index truncVals := truncateIndexValues(r.tiSessionCtx, r.sourceTableInfo, indexCols, cols, vals) diff --git a/pkg/sqlmodel/row_change_test.go b/pkg/sqlmodel/row_change_test.go index 7ace5be2d2e..c685a211ef1 100644 --- a/pkg/sqlmodel/row_change_test.go +++ b/pkg/sqlmodel/row_change_test.go @@ -275,7 +275,7 @@ func (s *dpanicSuite) TestExpressionIndex() { sql := `CREATE TABLE tb1 ( id INT PRIMARY KEY, j JSON, - KEY j_index ((cast(json_extract(j,'$[*]') as signed array)), id) + UNIQUE KEY j_index ((cast(json_extract(j,'$[*]') as signed array)), id) )` ti := mockTableInfo(s.T(), sql) change := NewRowChange(source, nil, nil, []interface{}{1, `[1,2,3]`}, ti, nil, nil) @@ -283,6 +283,10 @@ func (s *dpanicSuite) TestExpressionIndex() { s.Equal("INSERT INTO `db`.`tb1` (`id`,`j`) VALUES (?,?)", sql) s.Equal([]interface{}{1, `[1,2,3]`}, args) require.Equal(s.T(), 2, change.ColumnCount()) + keys := change.CausalityKeys() + // TODO: need change it after future fix + require.Equal(s.T(), []string{"1.id.db.tb1"}, keys) + change2 := NewRowChange(source, nil, []interface{}{1, `[1,2,3]`}, []interface{}{1, `[1,2,3,4]`}, ti, nil, nil) sql, args = change2.GenSQL(DMLUpdate) s.Equal("UPDATE `db`.`tb1` SET `id` = ?, `j` = ? WHERE `id` = ? LIMIT 1", sql)