Skip to content

Commit

Permalink
Merge branch 'sink-error-split-1' of github.com:hicqu/ticdc into sink…
Browse files Browse the repository at this point in the history
…-error-split-1
  • Loading branch information
hicqu committed May 16, 2023
2 parents 19a70d6 + 032c0da commit 5ac5fe0
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 22 deletions.
31 changes: 19 additions & 12 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,18 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, error) {
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))

// columnInfos and rowColumnInfos hold different column metadata,
// they should have the same length and order.
columnInfos := make([]*timodel.ColumnInfo, len(tableInfo.RowColumnsOffset))
rowColumnInfos := make([]rowcodec.ColInfo, len(tableInfo.RowColumnsOffset))

_, _, extendColumnInfos := tableInfo.GetRowColInfos()

for _, colInfo := range tableInfo.Columns {
for idx, colInfo := range tableInfo.Columns {
if !model.IsColCDCVisible(colInfo) {
log.Debug("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
Expand Down Expand Up @@ -370,7 +376,7 @@ func datum2Column(
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()),
Expand All @@ -391,8 +397,9 @@ func datum2Column(
ApproximateBytes: size + sizeOfEmptyColumn,
}
columnInfos[offset] = colInfo
rowColumnInfos[offset] = extendColumnInfos[idx]
}
return cols, rawCols, columnInfos, nil
return cols, rawCols, columnInfos, rowColumnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
Expand Down Expand Up @@ -478,10 +485,11 @@ func (m *mounter) verifyChecksum(

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
matched bool
err error
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
extendColumnInfos []rowcodec.ColInfo
matched bool
err error

checksum *integrity.Checksum

Expand All @@ -503,7 +511,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -543,7 +551,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
current uint32
)
if row.RowExist {
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -572,7 +580,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
intRowID = row.RecordID.IntValue()
}

_, _, colInfos := tableInfo.GetRowColInfos()
rawRow.PreRowDatums = preRawCols
rawRow.RowDatums = rawCols

Expand All @@ -597,7 +604,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
ColInfos: colInfos,
ColInfos: extendColumnInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
Expand Down
6 changes: 4 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ type TableInfo struct {
HandleIndexID int64

IndexColumnsOffset [][]int
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType
// rowColInfos extend the model.ColumnInfo with some extra information
// it's the same length and order with the model.TableInfo.Columns
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType
}

// WrapTableInfo creates a TableInfo from a timodel.TableInfo
Expand Down
13 changes: 9 additions & 4 deletions pkg/sink/codec/avro/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
var holder map[string]interface{}
switch ty := field["type"].(type) {
case []interface{}:
holder = ty[1].(map[string]interface{})["connect.parameters"].(map[string]interface{})
if m, ok := ty[0].(map[string]interface{}); ok {
holder = m["connect.parameters"].(map[string]interface{})
} else if m, ok := ty[1].(map[string]interface{}); ok {
holder = m["connect.parameters"].(map[string]interface{})
} else {
log.Panic("type info is anything else", zap.Any("typeInfo", field["type"]))
}
case map[string]interface{}:
holder = ty["connect.parameters"].(map[string]interface{})
default:
Expand Down Expand Up @@ -278,6 +284,7 @@ func (d *decoder) NextResolvedEvent() (uint64, error) {
return 0, errors.New("value should not be empty")
}
ts := binary.BigEndian.Uint64(d.value[1:])
d.value = nil
return ts, nil
}

Expand All @@ -297,6 +304,7 @@ func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) {
if err != nil {
return nil, errors.WrapError(errors.ErrDecodeFailed, err)
}
d.value = nil
return result, nil
}

Expand Down Expand Up @@ -392,9 +400,6 @@ func (d *decoder) verifyChecksum(columns []*model.Column, expected uint64) error
return errors.New("checksum mismatch")
}

log.Debug("checksum passed",
zap.Uint64("expected", expected), zap.Uint32("actual", checksum))

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (m *SchemaManager) Lookup(
log.Error("Creating Avro codec failed", zap.Error(err))
return nil, cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
cacheEntry.schemaID = jsonResp.SchemaID
cacheEntry.schemaID = schemaID

m.cacheRWLock.Lock()
m.cache[key] = cacheEntry
Expand Down
4 changes: 2 additions & 2 deletions scripts/check-diff-line-width.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ set -e
# the pattern `\(#[0-9]+\)$`. It's usually a master branch commit.
BASE_HASH=$(git --no-pager log -E --grep='\(#[0-9]+\)$' -n 1 --format=format:%H)
# Please contact TiFlow maintainers before changing following settings.
WARN_THRESHOLD=80
ERROR_THRESHOLD=100
WARN_THRESHOLD=100
ERROR_THRESHOLD=140

git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \
-- ':(exclude)*_gen.go' \
Expand Down

0 comments on commit 5ac5fe0

Please sign in to comment.