Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): calculate row level checksum for timestmap by using UTC time zone (#10564) #10646

Merged
Merged
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3e52aee
This is an automated cherry-pick of #10564
3AceShowHand Feb 22, 2024
67ff73c
remove useless files.
3AceShowHand Apr 15, 2024
07843c3
fix go mod.
3AceShowHand Apr 15, 2024
bba8547
update tidb dependencies.
3AceShowHand Apr 15, 2024
3e8857d
fix dm lightning config.
3AceShowHand Apr 15, 2024
0f7b8bd
fix go mod.
3AceShowHand Apr 15, 2024
34954e2
Merge branch 'release-7.5' into cherry-pick-10564-to-release-7.5
3AceShowHand Apr 16, 2024
0cafd5c
fix some conflicts.
3AceShowHand Apr 16, 2024
4ff1185
also fix the pebble sorter.
3AceShowHand Apr 16, 2024
352a2aa
fix dm build.
3AceShowHand Apr 16, 2024
3c6ec6b
fix build engine.
3AceShowHand Apr 16, 2024
9f4c5ee
fix dm tests.
3AceShowHand Apr 16, 2024
ae8b720
fix make check
3AceShowHand Apr 16, 2024
3c92ee3
fix mounter import tidb packages.
3AceShowHand Apr 17, 2024
5fa873e
fix all tests.
3AceShowHand Apr 17, 2024
b76da7c
fix test.
3AceShowHand Apr 18, 2024
6b46f22
fix test.
3AceShowHand Apr 18, 2024
b2ecad5
add timezone.
3AceShowHand Apr 19, 2024
aa63241
fix timezone issue.
3AceShowHand Apr 19, 2024
b5146c6
fix ut
3AceShowHand Apr 19, 2024
7d3191e
fix the case.
3AceShowHand Apr 22, 2024
6876f1e
fix the case.
3AceShowHand Apr 22, 2024
4e50976
cdc: reduce seek CPU usage on PebbleDB sorter (#10939)
hicqu Apr 22, 2024
af3316c
remove case.
3AceShowHand Apr 23, 2024
cb5e9af
call pebble the correct way.
3AceShowHand Apr 23, 2024
393ca43
gc(ticdc): fix data race in gc manager (#10846)
sdojjy Mar 26, 2024
84ab6a4
fix tests.
3AceShowHand Apr 24, 2024
27b6421
fix unit test
3AceShowHand Apr 24, 2024
8168a51
fix dm integration test.
3AceShowHand Apr 24, 2024
34e4cf3
try to debug tracker test.
3AceShowHand Apr 25, 2024
bd04ce9
Merge branch 'release-7.5' into cherry-pick-10564-to-release-7.5
3AceShowHand Apr 28, 2024
5bb6e24
fix go mod
3AceShowHand Apr 28, 2024
234b58d
remove version.
3AceShowHand Apr 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ type mounter struct {
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

// encoder is used to calculate the checksum.
encoder *rowcodec.Encoder
}

// NewMounter creates a mounter
Expand All @@ -107,8 +104,6 @@ func NewMounter(schemaStorage SchemaStorage,
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,

encoder: &rowcodec.Encoder{},
}
}

Expand Down Expand Up @@ -398,14 +393,40 @@ func datum2Column(
return cols, rawCols, columnInfos, rowColumnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
func (m *mounter) calculateChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
column := rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
}
columns = append(columns, column)
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
})

calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum(m.tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

// return error when calculate the checksum.
// return false if the checksum is not matched
// return true if the checksum is matched and the checksum is the matched one.
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
) (uint32, int, bool, error) {
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, 0, true, nil
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
Expand All @@ -415,68 +436,50 @@ func (m *mounter) verifyChecksum(
decoder = m.decoder
}
if decoder == nil {
return 0, 0, false, errors.New("cannot found the decoder to get the checksum")
return 0, false, errors.New("cannot found the decoder to get the checksum")
}

version := decoder.ChecksumVersion()
// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, version, true, nil
}

columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
columns = append(columns, rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
})
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
})
calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
return 0, true, nil
}

checksum, err := calculator.Checksum(m.tz)
checksum, err := m.calculateChecksum(columnInfos, rawColumns)
if err != nil {
log.Error("failed to calculate the checksum", zap.Error(err))
return 0, version, false, errors.Trace(err)
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, errors.Trace(err)
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, version, true, nil
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if !ok {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, version,
false, errors.New("cannot found the extra checksum from the event")
zap.Uint32("first", first))
return checksum, false, nil
}

if checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL"+
"execution phase",
zap.Uint32("checksum", checksum),
zap.Uint32("extra", extra))
return checksum, version, true, nil
return checksum, true, nil
}

log.Error("checksum mismatch",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, version, false, nil
return checksum, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
Expand All @@ -493,6 +496,12 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
corrupted bool
)

if m.decoder != nil {
checksumVersion = m.decoder.ChecksumVersion()
} else if m.preDecoder != nil {
checksumVersion = m.preDecoder.ChecksumVersion()
}

// Decode previous columns.
var (
preCols []*model.Column
Expand All @@ -507,7 +516,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

preChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -536,7 +545,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

current, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
current, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down
Loading