Skip to content

Commit

Permalink
Merge branch 'master' into cdc-redo-refine-s3
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 13, 2021
2 parents d90dee1 + 5665818 commit 7020d71
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 44 deletions.
79 changes: 41 additions & 38 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,25 +313,24 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
continue
}
var err error
var warn string
var size int
if exist {
var err error
var warn string
var size int
colValue, size, warn, err = formatColVal(colDatums, colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
colSize += size
} else if fillWithDefaultValue {
var size int
colValue, size = getDefaultOrZeroValue(colInfo)
colSize += size
} else {
continue
colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
colSize += size
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Type: colInfo.Tp,
Expand Down Expand Up @@ -437,6 +436,7 @@ func sizeOfBytes(b []byte) int {
return len(b) + sizeOfEmptyBytes
}

// formatColVal return interface{} need to meet the same requirement as getDefaultOrZeroValue
func formatColVal(datum types.Datum, tp byte) (
value interface{}, size int, warn string, err error,
) {
Expand Down Expand Up @@ -490,36 +490,39 @@ func formatColVal(datum types.Datum, tp byte) (
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), warn, nil
default:
// FIXME: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
return datum.GetValue(), sizeOfDatum(datum), "", nil
}
}

func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int) {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
// getDefaultOrZeroValue return interface{} need to meet to require type in
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, error) {
var d types.Datum
if !mysql.HasNotNullFlag(col.Flag) {
d := types.NewDatum(nil)
const size = unsafe.Sizeof(d.GetValue())
return d.GetValue(), int(size)
}

if col.GetDefaultValue() != nil {
d := types.NewDatum(col.GetDefaultValue())
return d.GetValue(), sizeOfDatum(d)
}
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d := types.NewDatum(col.FieldType.Elems[0])
return d.GetValue(), sizeOfDatum(d)
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, sizeOfEmptyBytes
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
d = types.NewDatum(nil)
} else if col.GetDefaultValue() != nil {
d = types.NewDatum(col.GetDefaultValue())
} else {
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, sizeOfEmptyBytes, "", nil
default:
d = table.GetZeroValue(col)
}
}

d := table.GetZeroValue(col)
return d.GetValue(), sizeOfDatum(d)
return formatColVal(d, col.Tp)
}

// DecodeTableID decodes the raw key to a table ID
Expand Down
Loading

0 comments on commit 7020d71

Please sign in to comment.