diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 4d06c5d0064..3171e34385d 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -337,12 +337,18 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { func datum2Column( tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool, -) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, error) { +) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) + + // columnInfos and rowColumnInfos hold different column metadata, + // they should have the same length and order. columnInfos := make([]*timodel.ColumnInfo, len(tableInfo.RowColumnsOffset)) + rowColumnInfos := make([]rowcodec.ColInfo, len(tableInfo.RowColumnsOffset)) + + _, _, extendColumnInfos := tableInfo.GetRowColInfos() - for _, colInfo := range tableInfo.Columns { + for idx, colInfo := range tableInfo.Columns { if !model.IsColCDCVisible(colInfo) { log.Debug("skip the column which is not visible", zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O)) @@ -370,7 +376,7 @@ func datum2Column( colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo) } if err != nil { - return nil, nil, nil, errors.Trace(err) + return nil, nil, nil, nil, errors.Trace(err) } if warn != "" { log.Warn(warn, zap.String("table", tableInfo.TableName.String()), @@ -391,8 +397,9 @@ func datum2Column( ApproximateBytes: size + sizeOfEmptyColumn, } columnInfos[offset] = colInfo + rowColumnInfos[offset] = extendColumnInfos[idx] } - return cols, rawCols, columnInfos, nil + return cols, rawCols, columnInfos, rowColumnInfos, nil } // return error if cannot get the expected checksum from the decoder @@ -478,10 +485,11 @@ func (m *mounter) verifyChecksum( func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) { var ( - rawRow model.RowChangedDatums - columnInfos []*timodel.ColumnInfo - matched bool - err error + rawRow model.RowChangedDatums + columnInfos []*timodel.ColumnInfo + extendColumnInfos []rowcodec.ColInfo + matched bool + err error checksum *integrity.Checksum @@ -503,7 +511,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) + preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -543,7 +551,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d current uint32 ) if row.RowExist { - cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, true) + cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -572,7 +580,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d intRowID = row.RecordID.IntValue() } - _, _, colInfos := tableInfo.GetRowColInfos() rawRow.PreRowDatums = preRawCols rawRow.RowDatums = rawCols @@ -597,7 +604,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d TableID: row.PhysicalTableID, IsPartition: tableInfo.GetPartitionInfo() != nil, }, - ColInfos: colInfos, + ColInfos: extendColumnInfos, TableInfo: tableInfo, Columns: cols, PreColumns: preCols, diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index b81b30e8bda..5bbc813852d 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -1282,7 +1282,7 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) - cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) + cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index dae22eb05c3..281d2504dbf 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -66,8 +66,10 @@ type TableInfo struct { HandleIndexID int64 IndexColumnsOffset [][]int - rowColInfos []rowcodec.ColInfo - rowColFieldTps map[int64]*types.FieldType + // rowColInfos extend the model.ColumnInfo with some extra information + // it's the same length and order with the model.TableInfo.Columns + rowColInfos []rowcodec.ColInfo + rowColFieldTps map[int64]*types.FieldType } // WrapTableInfo creates a TableInfo from a timodel.TableInfo diff --git a/pkg/sink/codec/avro/decoder.go b/pkg/sink/codec/avro/decoder.go index 0b53320d974..cd4d5f93bf2 100644 --- a/pkg/sink/codec/avro/decoder.go +++ b/pkg/sink/codec/avro/decoder.go @@ -145,7 +145,13 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { var holder map[string]interface{} switch ty := field["type"].(type) { case []interface{}: - holder = ty[1].(map[string]interface{})["connect.parameters"].(map[string]interface{}) + if m, ok := ty[0].(map[string]interface{}); ok { + holder = m["connect.parameters"].(map[string]interface{}) + } else if m, ok := ty[1].(map[string]interface{}); ok { + holder = m["connect.parameters"].(map[string]interface{}) + } else { + log.Panic("type info is anything else", zap.Any("typeInfo", field["type"])) + } case map[string]interface{}: holder = ty["connect.parameters"].(map[string]interface{}) default: @@ -278,6 +284,7 @@ func (d *decoder) NextResolvedEvent() (uint64, error) { return 0, errors.New("value should not be empty") } ts := binary.BigEndian.Uint64(d.value[1:]) + d.value = nil return ts, nil } @@ -297,6 +304,7 @@ func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) { if err != nil { return nil, errors.WrapError(errors.ErrDecodeFailed, err) } + d.value = nil return result, nil } @@ -392,9 +400,6 @@ func (d *decoder) verifyChecksum(columns []*model.Column, expected uint64) error return errors.New("checksum mismatch") } - log.Debug("checksum passed", - zap.Uint64("expected", expected), zap.Uint32("actual", checksum)) - return nil } diff --git a/pkg/sink/codec/avro/schema_registry.go b/pkg/sink/codec/avro/schema_registry.go index 48ff6f3b347..f21d3bce1c2 100644 --- a/pkg/sink/codec/avro/schema_registry.go +++ b/pkg/sink/codec/avro/schema_registry.go @@ -304,7 +304,7 @@ func (m *SchemaManager) Lookup( log.Error("Creating Avro codec failed", zap.Error(err)) return nil, cerror.WrapError(cerror.ErrAvroSchemaAPIError, err) } - cacheEntry.schemaID = jsonResp.SchemaID + cacheEntry.schemaID = schemaID m.cacheRWLock.Lock() m.cache[key] = cacheEntry diff --git a/scripts/check-diff-line-width.sh b/scripts/check-diff-line-width.sh index b1d2ecb0eed..af65760cc44 100755 --- a/scripts/check-diff-line-width.sh +++ b/scripts/check-diff-line-width.sh @@ -20,8 +20,8 @@ set -e # the pattern `\(#[0-9]+\)$`. It's usually a master branch commit. BASE_HASH=$(git --no-pager log -E --grep='\(#[0-9]+\)$' -n 1 --format=format:%H) # Please contact TiFlow maintainers before changing following settings. -WARN_THRESHOLD=80 -ERROR_THRESHOLD=100 +WARN_THRESHOLD=100 +ERROR_THRESHOLD=140 git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \ -- ':(exclude)*_gen.go' \