diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index dae83cc4320..0a9cb75cd72 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -341,7 +341,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { } func datum2Column( - tableInfo *model.TableInfo, datums map[int64]types.Datum, + tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location, ) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) { cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) @@ -368,7 +368,7 @@ func datum2Column( if exist { colValue, size, warn, err = formatColVal(colDatum, colInfo) } else { - colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo) + colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) } if err != nil { return nil, nil, nil, errors.Trace(err) @@ -504,7 +504,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow) + preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -536,7 +536,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d currentChecksum uint32 ) if row.RowExist { - cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row) + cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -698,7 +698,9 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( // https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 // Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support // TODO: Check default expr support -func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, string, error) { +func getDefaultOrZeroValue( + col *timodel.ColumnInfo, tz *time.Location, +) (types.Datum, any, int, string, error) { var ( d types.Datum err error @@ -715,6 +717,15 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, stri if err != nil { return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err) } + switch col.GetType() { + case mysql.TypeTimestamp: + t := d.GetMysqlTime() + err = t.ConvertTimeZone(time.UTC, tz) + if err != nil { + return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err) + } + d.SetMysqlTime(t) + } } else if !mysql.HasNotNullFlag(col.GetFlag()) { // NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx", // ref: https://github.com/pingcap/ticdc/issues/3929 diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index e6b6368eaea..464830f0340 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -873,8 +873,10 @@ func TestGetDefaultZeroValue(t *testing.T) { }, } + tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ) + require.NoError(t, err) for _, tc := range testCases { - _, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo) + _, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo, tz) require.Equal(t, tc.Res, val, tc.Name) } @@ -882,9 +884,9 @@ func TestGetDefaultZeroValue(t *testing.T) { OriginDefaultValue: "-3.14", // no float FieldType: *ftTypeNewDecimalNotNull, } - _, val, _, _, _ := getDefaultOrZeroValue(&colInfo) + _, val, _, _, _ := getDefaultOrZeroValue(&colInfo, tz) decimal := new(types.MyDecimal) - err := decimal.FromString([]byte("-3.14")) + err = decimal.FromString([]byte("-3.14")) require.NoError(t, err) require.Equal(t, decimal.String(), val, "mysql.TypeNewDecimal + notnull + default") @@ -892,10 +894,10 @@ func TestGetDefaultZeroValue(t *testing.T) { OriginDefaultValue: "2020-11-19 12:12:12", FieldType: *ftTypeTimestampNotNull, } - _, val, _, _, _ = getDefaultOrZeroValue(&colInfo) + _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expected, err := types.ParseTimeFromFloatString( types.DefaultStmtNoWarningContext, - "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) + "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default") @@ -903,10 +905,10 @@ func TestGetDefaultZeroValue(t *testing.T) { OriginDefaultValue: "2020-11-19 12:12:12", FieldType: *ftTypeTimestampNull, } - _, val, _, _, _ = getDefaultOrZeroValue(&colInfo) + _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expected, err = types.ParseTimeFromFloatString( types.DefaultStmtNoWarningContext, - "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) + "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default") @@ -914,7 +916,7 @@ func TestGetDefaultZeroValue(t *testing.T) { OriginDefaultValue: "e1", FieldType: *ftTypeEnumNotNull, } - _, val, _, _, _ = getDefaultOrZeroValue(&colInfo) + _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expectedEnum, err := types.ParseEnumName(colInfo.FieldType.GetElems(), "e1", colInfo.FieldType.GetCollate()) require.NoError(t, err) require.Equal(t, expectedEnum.Value, val, "mysql.TypeEnum + notnull + default") @@ -923,7 +925,7 @@ func TestGetDefaultZeroValue(t *testing.T) { OriginDefaultValue: "1,e", FieldType: *ftTypeSetNotNull, } - _, val, _, _, _ = getDefaultOrZeroValue(&colInfo) + _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expectedSet, err := types.ParseSetName(colInfo.FieldType.GetElems(), "1,e", colInfo.FieldType.GetCollate()) require.NoError(t, err) require.Equal(t, expectedSet.Value, val, "mysql.TypeSet + notnull + default") @@ -1102,6 +1104,37 @@ func TestE2ERowLevelChecksum(t *testing.T) { require.NoError(t, err) } +func TestTimezoneDefaultValue(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() + + _ = helper.DDL2Event(`create table test.t(a int primary key)`) + insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t") + require.NotNil(t, insertEvent) + + tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t") + require.True(t, ok) + + key, oldValue := helper.getLastKeyValue(tableInfo.ID) + + _ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`) + ts := helper.schemaStorage.GetLastSnapshot().CurrentTs() + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + OldValue: oldValue, + StartTs: ts - 1, + CRTs: ts + 1, + } + polymorphicEvent := model.NewPolymorphicEvent(rawKV) + err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent) + require.NoError(t, err) + + event := polymorphicEvent.Row + require.NotNil(t, event) + require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string)) +} + func TestVerifyChecksumTime(t *testing.T) { replicaConfig := config.GetDefaultReplicaConfig() replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness @@ -1565,6 +1598,8 @@ func TestBuildTableInfo(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", }, } + tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ) + require.NoError(t, err) p := parser.New() for i, c := range cases { stmt, err := p.ParseOneStmt(c.origin, "", "") @@ -1572,7 +1607,7 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) - colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}) + colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) require.NoError(t, err) e := model.RowChangedEvent{ TableInfo: cdcTableInfo,