From 0a60f0789681620f5c53f89548d0a5bb0664dbe6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 30 May 2023 18:30:41 +0800 Subject: [PATCH] sink(ticdc): add table default value definition for storage sink (#9040) (#9070) close pingcap/tiflow#9066 --- cdc/entry/mounter.go | 5 +-- cdc/entry/mounter_test.go | 2 +- pkg/sink/cloudstorage/table_definition.go | 16 +++++---- .../cloudstorage/table_definition_test.go | 34 ++++++++++++++++--- 4 files changed, 43 insertions(+), 14 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index a5d2c4f4732..a01700b279b 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -309,7 +309,7 @@ func datum2Column( if warn != "" { log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) } - defaultValue := getDDLDefaultDefinition(colInfo) + defaultValue := GetDDLDefaultDefinition(colInfo) offset := tableInfo.RowColumnsOffset[colInfo.ID] rawCols[offset] = colDatums cols[offset] = &model.Column{ @@ -536,7 +536,8 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, stri return d, v, size, warn, err } -func getDDLDefaultDefinition(col *timodel.ColumnInfo) interface{} { +// GetDDLDefaultDefinition returns the default definition of a column. +func GetDDLDefaultDefinition(col *timodel.ColumnInfo) interface{} { defaultValue := col.GetDefaultValue() if defaultValue == nil { defaultValue = col.GetOriginDefaultValue() diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 281cd866539..4767eeb681d 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -980,7 +980,7 @@ func TestGetDefaultZeroValue(t *testing.T) { for _, tc := range testCases { _, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo) require.Equal(t, tc.Res, val, tc.Name) - val = getDDLDefaultDefinition(&tc.ColInfo) + val = GetDDLDefaultDefinition(&tc.ColInfo) require.Equal(t, tc.Default, val, tc.Name) } } diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go index a6314ce33f4..42954bf77cd 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/sink/cloudstorage/table_definition.go @@ -23,6 +23,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" @@ -37,12 +38,13 @@ const ( // TableCol denotes the column info for a table definition. type TableCol struct { - Name string `json:"ColumnName" ` - Tp string `json:"ColumnType"` - Precision string `json:"ColumnPrecision,omitempty"` - Scale string `json:"ColumnScale,omitempty"` - Nullable string `json:"ColumnNullable,omitempty"` - IsPK string `json:"ColumnIsPk,omitempty"` + Name string `json:"ColumnName" ` + Tp string `json:"ColumnType"` + Default interface{} `json:"ColumnDefault,omitempty"` + Precision string `json:"ColumnPrecision,omitempty"` + Scale string `json:"ColumnScale,omitempty"` + Nullable string `json:"ColumnNullable,omitempty"` + IsPK string `json:"ColumnIsPk,omitempty"` } // FromTiColumnInfo converts from TiDB ColumnInfo to TableCol. @@ -71,6 +73,7 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) { if mysql.HasNotNullFlag(col.GetFlag()) { t.Nullable = "false" } + t.Default = entry.GetDDLDefaultDefinition(col) switch col.GetType() { case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration: @@ -110,6 +113,7 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) { if t.Nullable == "false" { col.AddFlag(mysql.NotNullFlag) } + col.DefaultValue = t.Default if strings.Contains(t.Tp, "BLOB") || strings.Contains(t.Tp, "BINARY") { col.SetCharset(charset.CharsetBin) } else { diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go index df8a9dd5daa..e3205f766b9 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/sink/cloudstorage/table_definition_test.go @@ -30,22 +30,38 @@ func generateTableDef() (TableDefinition, *model.TableInfo) { var columns []*timodel.ColumnInfo ft := types.NewFieldType(mysql.TypeLong) ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) - col := &timodel.ColumnInfo{Name: timodel.NewCIStr("Id"), FieldType: *ft} + col := &timodel.ColumnInfo{ + Name: timodel.NewCIStr("Id"), + FieldType: *ft, + DefaultValue: 10, + } columns = append(columns, col) ft = types.NewFieldType(mysql.TypeVarchar) ft.SetFlag(mysql.NotNullFlag) ft.SetFlen(128) - col = &timodel.ColumnInfo{Name: timodel.NewCIStr("LastName"), FieldType: *ft} + col = &timodel.ColumnInfo{ + Name: timodel.NewCIStr("LastName"), + FieldType: *ft, + DefaultValue: "Default LastName", + } columns = append(columns, col) ft = types.NewFieldType(mysql.TypeVarchar) ft.SetFlen(64) - col = &timodel.ColumnInfo{Name: timodel.NewCIStr("FirstName"), FieldType: *ft} + col = &timodel.ColumnInfo{ + Name: timodel.NewCIStr("FirstName"), + FieldType: *ft, + DefaultValue: "Default FirstName", + } columns = append(columns, col) ft = types.NewFieldType(mysql.TypeDatetime) - col = &timodel.ColumnInfo{Name: timodel.NewCIStr("Birthday"), FieldType: *ft} + col = &timodel.ColumnInfo{ + Name: timodel.NewCIStr("Birthday"), + FieldType: *ft, + DefaultValue: 12345678, + } columns = append(columns, col) tableInfo := &model.TableInfo{ @@ -380,22 +396,26 @@ func TestTableDefinition(t *testing.T) { "ColumnName": "Id", "ColumnType": "INT", "ColumnPrecision": "11", + "ColumnDefault":10, "ColumnNullable": "false", "ColumnIsPk": "true" }, { "ColumnName": "LastName", "ColumnType": "VARCHAR", + "ColumnDefault":"Default LastName", "ColumnPrecision": "128", "ColumnNullable": "false" }, { "ColumnName": "FirstName", + "ColumnDefault":"Default FirstName", "ColumnType": "VARCHAR", "ColumnPrecision": "64" }, { "ColumnName": "Birthday", + "ColumnDefault":1.2345678e+07, "ColumnType": "DATETIME" } ], @@ -424,22 +444,26 @@ func TestTableDefinition(t *testing.T) { "ColumnName": "Id", "ColumnType": "INT", "ColumnPrecision": "11", + "ColumnDefault":10, "ColumnNullable": "false", "ColumnIsPk": "true" }, { "ColumnName": "LastName", "ColumnType": "VARCHAR", + "ColumnDefault":"Default LastName", "ColumnPrecision": "128", "ColumnNullable": "false" }, { "ColumnName": "FirstName", + "ColumnDefault":"Default FirstName", "ColumnType": "VARCHAR", "ColumnPrecision": "64" }, { "ColumnName": "Birthday", + "ColumnDefault":1.2345678e+07, "ColumnType": "DATETIME" } ], @@ -471,7 +495,7 @@ func TestTableDefinitionGenFilePath(t *testing.T) { def, _ := generateTableDef() tablePath, err := def.GenerateSchemaFilePath() require.NoError(t, err) - require.Equal(t, "schema1/table1/meta/schema_100_0785427252.json", tablePath) + require.Equal(t, "schema1/table1/meta/schema_100_3752767265.json", tablePath) } func TestTableDefinitionSum32(t *testing.T) {