Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-row
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Oct 27, 2021
2 parents 0676423 + bcf4281 commit 814c762
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
99 changes: 99 additions & 0 deletions cdc/redo/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package redo

import (
"bytes"

"github.com/pingcap/ticdc/cdc/model"
pmodel "github.com/pingcap/tidb/parser/model"
)

// RowToRedo converts row changed event to redo log row
func RowToRedo(row *model.RowChangedEvent) *model.RedoRowChangedEvent {
redoLog := &model.RedoRowChangedEvent{
Row: row,
Columns: make([]*model.RedoColumn, 0, len(row.Columns)),
PreColumns: make([]*model.RedoColumn, 0, len(row.PreColumns)),
}
for _, column := range row.Columns {
var redoColumn *model.RedoColumn
if column != nil {
// workaround msgp issue(Decode replaces empty slices with nil https://github.com/tinylib/msgp/issues/247)
// if []byte("") send with RowChangedEvent after UnmarshalMsg,
// the value will become nil, which is unexpected.
switch v := column.Value.(type) {
case []byte:
if bytes.Equal(v, []byte("")) {
column.Value = ""
}
}
redoColumn = &model.RedoColumn{Column: column, Flag: uint64(column.Flag)}
}
redoLog.Columns = append(redoLog.Columns, redoColumn)
}
for _, column := range row.PreColumns {
var redoColumn *model.RedoColumn
if column != nil {
switch v := column.Value.(type) {
case []byte:
if bytes.Equal(v, []byte("")) {
column.Value = ""
}
}
redoColumn = &model.RedoColumn{Column: column, Flag: uint64(column.Flag)}
}
redoLog.PreColumns = append(redoLog.PreColumns, redoColumn)
}
return redoLog
}

// LogToRow converts redo log row to row changed event
func LogToRow(redoLog *model.RedoRowChangedEvent) *model.RowChangedEvent {
row := redoLog.Row
row.Columns = make([]*model.Column, 0, len(redoLog.Columns))
row.PreColumns = make([]*model.Column, 0, len(redoLog.PreColumns))
for _, column := range redoLog.PreColumns {
if column == nil {
row.PreColumns = append(row.PreColumns, nil)
continue
}
column.Column.Flag = model.ColumnFlagType(column.Flag)
row.PreColumns = append(row.PreColumns, column.Column)
}
for _, column := range redoLog.Columns {
if column == nil {
row.Columns = append(row.Columns, nil)
continue
}
column.Column.Flag = model.ColumnFlagType(column.Flag)
row.Columns = append(row.Columns, column.Column)
}
return row
}

// DDLToRedo converts ddl event to redo log ddl
func DDLToRedo(ddl *model.DDLEvent) *model.RedoDDLEvent {
redoDDL := &model.RedoDDLEvent{
DDL: ddl,
Type: byte(ddl.Type),
}
return redoDDL
}

// LogToDDL converts redo log ddl to ddl event
func LogToDDL(redoDDL *model.RedoDDLEvent) *model.DDLEvent {
redoDDL.DDL.Type = pmodel.ActionType(redoDDL.Type)
return redoDDL.DDL
}
145 changes: 145 additions & 0 deletions cdc/redo/convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package redo

import (
"testing"

"github.com/pingcap/ticdc/cdc/model"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestRowRedoConvert(t *testing.T) {
t.Parallel()
row := &model.RowChangedEvent{
StartTs: 100,
CommitTs: 120,
Table: &model.TableName{Schema: "test", Table: "table1", TableID: 57},
PreColumns: []*model.Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: int64(1),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: "char",
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: int64(1),
}, nil},
Columns: []*model.Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: int64(2),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: "char-updated",
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: int64(2),
}, nil},
IndexColumns: [][]int{{1, 3}},
}
rowRedo := RowToRedo(row)
require.Equal(t, 4, len(rowRedo.PreColumns))
require.Equal(t, 4, len(rowRedo.Columns))

redoLog := &model.RedoLog{
RedoRow: rowRedo,
Type: model.RedoLogTypeRow,
}
data, err := redoLog.MarshalMsg(nil)
require.Nil(t, err)
redoLog2 := &model.RedoLog{}
_, err = redoLog2.UnmarshalMsg(data)
require.Nil(t, err)
require.Equal(t, row, LogToRow(redoLog2.RedoRow))
}

func TestRowRedoConvertWithEmptySlice(t *testing.T) {
t.Parallel()
row := &model.RowChangedEvent{
StartTs: 100,
CommitTs: 120,
Table: &model.TableName{Schema: "test", Table: "table1", TableID: 57},
PreColumns: []*model.Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: int64(1),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: []byte(""), // empty slice should be marshal and unmarshal safely
}},
Columns: []*model.Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: int64(2),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: []byte(""),
}},
IndexColumns: [][]int{{1}},
}
rowRedo := RowToRedo(row)
redoLog := &model.RedoLog{
RedoRow: rowRedo,
Type: model.RedoLogTypeRow,
}
data, err := redoLog.MarshalMsg(nil)
require.Nil(t, err)

redoLog2 := &model.RedoLog{}
_, err = redoLog2.UnmarshalMsg(data)
require.Nil(t, err)
require.Equal(t, row, LogToRow(redoLog2.RedoRow))
}

func TestDDLRedoConvert(t *testing.T) {
t.Parallel()
ddl := &model.DDLEvent{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.SimpleTableInfo{
Schema: "test",
Table: "t2",
},
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
}
redoDDL := DDLToRedo(ddl)

redoLog := &model.RedoLog{
RedoDDL: redoDDL,
Type: model.RedoLogTypeDDL,
}
data, err := redoLog.MarshalMsg(nil)
require.Nil(t, err)
redoLog2 := &model.RedoLog{}
_, err = redoLog2.UnmarshalMsg(data)
require.Nil(t, err)
require.Equal(t, ddl, LogToDDL(redoLog2.RedoDDL))
}

0 comments on commit 814c762

Please sign in to comment.