Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ticdc): add more unit test to cover checksum functionality #8990

Merged
merged 12 commits into from
May 23, 2023
247 changes: 196 additions & 51 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
Expand Down Expand Up @@ -461,6 +462,23 @@ func walkTableSpanInStore(t *testing.T, store tidbkv.Storage, tableID int64, f f
}
}

func getLastKeyValueInStore(t *testing.T, store tidbkv.Storage, tableID int64) (key, value []byte) {
txn, err := store.Begin()
require.NoError(t, err)
defer txn.Rollback() //nolint:errcheck
startKey, endKey := spanz.GetTableRange(tableID)
kvIter, err := txn.Iter(startKey, endKey)
require.NoError(t, err)
defer kvIter.Close()
for kvIter.Valid() {
key = kvIter.Key()
value = kvIter.Value()
err = kvIter.Next()
require.NoError(t, err)
}
return key, value
}

// We use OriginDefaultValue instead of DefaultValue in the ut, pls ref to
// https://github.com/pingcap/tiflow/issues/4048
// FIXME: OriginDefaultValue seems always to be string, and test more corner case
Expand Down Expand Up @@ -990,9 +1008,141 @@ func TestGetDefaultZeroValue(t *testing.T) {
}
}

func TestDecodeRowEnableChecksum(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

tk := helper.Tk()

tk.MustExec("set global tidb_enable_row_level_checksum = 1")
helper.Tk().MustExec("use test")

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
filter, err := filter.NewFilter(replicaConfig, "")
require.NoError(t, err)

ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)

changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row")
schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, false, changefeed, util.RoleTester, filter)
require.NoError(t, err)
require.NotNil(t, schemaStorage)

createTableDDL := "create table t (id int primary key, a int)"
job := helper.DDL2Job(createTableDDL)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)

mounter := NewMounter(schemaStorage, changefeed, time.Local,
filter, true, replicaConfig.Integrity).(*mounter)

ctx := context.Background()

tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)

// row without checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = false
tk.MustExec("insert into t values (1, 10)")

key, value := getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID)
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}

row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
// the upstream tidb does not enable checksum, so the checksum is nil
require.Nil(t, row.Checksum)

// row with one checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = true
tk.MustExec("insert into t values (2, 20)")

key, value = getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID)
rawKV = &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
require.NotNil(t, row.Checksum)

expected, ok := mounter.decoder.GetChecksum()
require.True(t, ok)
require.Equal(t, expected, row.Checksum.Current)
require.False(t, row.Checksum.Corrupted)

// row with 2 checksum
tk.MustExec("insert into t values (3, 30)")
job = helper.DDL2Job("alter table t change column a a varchar(10)")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

key, value = getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID)
rawKV = &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
require.NotNil(t, row.Checksum)

first, ok := mounter.decoder.GetChecksum()
require.True(t, ok)

extra, ok := mounter.decoder.GetExtraChecksum()
require.True(t, ok)

if row.Checksum.Current != first {
require.Equal(t, extra, row.Checksum.Current)
} else {
require.Equal(t, first, row.Checksum.Current)
}
require.False(t, row.Checksum.Corrupted)

// hack the table info to make the checksum corrupted
tableInfo.Columns[0].ID = 3

// corrupt-handle-level default to warn, so no error, but the checksum is corrupted
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row.Checksum)
require.True(t, row.Checksum.Corrupted)

mounter.integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError
_, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.Error(t, err)
require.ErrorIs(t, err, cerror.ErrCorruptedDataMutation)

job = helper.DDL2Job("drop table t")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
}

func TestDecodeRow(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("set @@tidb_enable_clustered_index=1;")
helper.Tk().MustExec("use test;")

Expand All @@ -1003,72 +1153,67 @@ func TestDecodeRow(t *testing.T) {

cfg := config.GetDefaultReplicaConfig()

cfgWithChecksumEnabled := config.GetDefaultReplicaConfig()
cfgWithChecksumEnabled.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness

for _, c := range []*config.ReplicaConfig{cfg, cfgWithChecksumEnabled} {
filter, err := filter.NewFilter(c, "")
require.NoError(t, err)

schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, false, changefeed, util.RoleTester, filter)
require.NoError(t, err)
filter, err := filter.NewFilter(cfg, "")
require.NoError(t, err)

// apply ddl to schemaStorage
ddl := "create table test.student(id int primary key, name char(50), age int, gender char(10))"
job := helper.DDL2Job(ddl)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, false, changefeed, util.RoleTester, filter)
require.NoError(t, err)

ts := schemaStorage.GetLastSnapshot().CurrentTs()
// apply ddl to schemaStorage
ddl := "create table test.student(id int primary key, name char(50), age int, gender char(10))"
job := helper.DDL2Job(ddl)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

schemaStorage.AdvanceResolvedTs(ver.Ver)
ts := schemaStorage.GetLastSnapshot().CurrentTs()

mounter := NewMounter(
schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter)
schemaStorage.AdvanceResolvedTs(ver.Ver)

helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`)
helper.Tk().MustExec(`update student set age = 27 where id = 1`)
mounter := NewMounter(
schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter)

ctx := context.Background()
decodeAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) {
walkTableSpanInStore(t, helper.Storage(), tableID, func(key []byte, value []byte) {
rawKV := f(key, value)
helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`)
helper.Tk().MustExec(`update student set age = 27 where id = 1`)

row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
ctx := context.Background()
decodeAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) {
walkTableSpanInStore(t, helper.Storage(), tableID, func(key []byte, value []byte) {
rawKV := f(key, value)

if row.Columns != nil {
require.NotNil(t, mounter.decoder)
}
row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)

if row.PreColumns != nil {
require.NotNil(t, mounter.preDecoder)
}
})
}
if row.Columns != nil {
require.NotNil(t, mounter.decoder)
}

toRawKV := func(key []byte, value []byte) *model.RawKVEntry {
return &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
if row.PreColumns != nil {
require.NotNil(t, mounter.preDecoder)
}
})
}

toRawKV := func(key []byte, value []byte) *model.RawKVEntry {
return &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}
}

tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student")
require.True(t, ok)
tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student")
require.True(t, ok)

decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)

job = helper.DDL2Job("drop table student")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
}
job = helper.DDL2Job("drop table student")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
}

// TestDecodeEventIgnoreRow tests a PolymorphicEvent.Row is nil
Expand Down
Loading