From 7eef020da87985670c4898cdc669be0114599679 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 17 Jan 2023 16:29:18 +0800 Subject: [PATCH 1/3] add mysql error black list --- cdc/sink/mysql/mysql.go | 3 + cdc/sink/mysql/mysql_test.go | 22 ++++++- pkg/errorutil/ignore.go | 43 ++++++++++++ proto/canal/CanalProtocol.pb.go | 30 +++------ proto/canal/EntryProtocol.pb.go | 113 +++++++++++++++----------------- 5 files changed, 130 insertions(+), 81 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 67413d9ec30..8f0f90cb553 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -328,6 +328,9 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error } s.statistics.AddDDLCount() err := s.execDDLWithMaxRetries(ctx, ddl) + if !errorutil.IsRetryableDDLError(err) { + return cerror.WrapChangefeedUnretryableErr(err) + } return errors.Trace(err) } diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 0c8ba514595..36192db27b1 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1646,6 +1646,14 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { WillReturnError(&dmysql.MySQLError{ Number: uint16(infoschema.ErrColumnExists.Code()), }) + mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int"). + WillReturnError(&dmysql.MySQLError{ + Number: uint16(infoschema.ErrColumnExists.Code()), + }) + mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21'"). + WillReturnError(&dmysql.MySQLError{ + Number: mysql.ErrPartitionMgmtOnNonpartitioned, + }) mock.ExpectRollback() mock.ExpectClose() return db, nil @@ -1692,7 +1700,16 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { Type: timodel.ActionAddColumn, Query: "ALTER TABLE test.t1 ADD COLUMN a int", } - + ddl3 := &model.DDLEvent{ + StartTs: 1020, + CommitTs: 1030, + TableInfo: &model.SimpleTableInfo{ + Schema: "test", + Table: "t2", + }, + Type: timodel.ActionAddTablePartition, + Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21'", + } err = sink.EmitDDLEvent(ctx, ddl1) require.Nil(t, err) err = sink.EmitDDLEvent(ctx, ddl2) @@ -1701,6 +1718,9 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { err = sink.EmitDDLEvent(ctx, ddl1) require.Nil(t, err) + err = sink.EmitDDLEvent(ctx, ddl3) + require.Nil(t, cerror.IsChangefeedUnRetryableError(err)) + err = sink.Close(ctx) require.Nil(t, err) } diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go index 8c247418a30..1d1e52c8947 100644 --- a/pkg/errorutil/ignore.go +++ b/pkg/errorutil/ignore.go @@ -19,6 +19,9 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/dbutil" + dmretry "github.com/pingcap/tiflow/dm/pkg/retry" + cerror "github.com/pingcap/tiflow/pkg/errors" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" ) @@ -70,3 +73,43 @@ func IsRetryableEtcdError(err error) bool { return false } } + +// IsRetryableDMLError check if the error is a retryable dml error. +func IsRetryableDMLError(err error) bool { + if !cerror.IsRetryableError(err) { + return false + } + // Check if the error is connection errors that can retry safely. + if dmretry.IsConnectionError(err) { + return true + } + // Check if the error is a retriable TiDB error or MySQL error. + return dbutil.IsRetryableError(err) +} + +// IsRetryableDDLError check if the error is a retryable ddl error. +func IsRetryableDDLError(err error) bool { + if IsRetryableDMLError(err) { + return true + } + err = errors.Cause(err) + mysqlErr, ok := err.(*dmysql.MySQLError) + if !ok { + return false + } + // If the error is in the black list, return false. + switch mysqlErr.Number { + case mysql.ErrAccessDenied, + mysql.ErrDBaccessDenied, + mysql.ErrSyntax, + mysql.ErrParse, + mysql.ErrNoDB, + mysql.ErrNoSuchTable, + mysql.ErrNoSuchIndex, + mysql.ErrKeyColumnDoesNotExits, + mysql.ErrWrongColumnName, + mysql.ErrPartitionMgmtOnNonpartitioned: + return false + } + return true +} diff --git a/proto/canal/CanalProtocol.pb.go b/proto/canal/CanalProtocol.pb.go index ac10ea042f0..c7ca2d31422 100644 --- a/proto/canal/CanalProtocol.pb.go +++ b/proto/canal/CanalProtocol.pb.go @@ -118,23 +118,20 @@ func (PacketType) EnumDescriptor() ([]byte, []int) { } type Packet struct { - // [default = 17]; + //[default = 17]; // // Types that are valid to be assigned to MagicNumberPresent: - // // *Packet_MagicNumber MagicNumberPresent isPacket_MagicNumberPresent `protobuf_oneof:"magic_number_present"` - // [default = 1]; + //[default = 1]; // // Types that are valid to be assigned to VersionPresent: - // // *Packet_Version VersionPresent isPacket_VersionPresent `protobuf_oneof:"version_present"` Type PacketType `protobuf:"varint,3,opt,name=type,proto3,enum=com.alibaba.otter.canal.protocol.PacketType" json:"type,omitempty"` - // [default = NONE]; + //[default = NONE]; // // Types that are valid to be assigned to CompressionPresent: - // // *Packet_Compression CompressionPresent isPacket_CompressionPresent `protobuf_oneof:"compression_present"` Body []byte `protobuf:"bytes,5,opt,name=body,proto3" json:"body,omitempty"` @@ -319,10 +316,9 @@ func (m *HeartBeat) GetStartTimestamp() int64 { } type Handshake struct { - // [default = "utf8"]; + // [default = "utf8"]; // // Types that are valid to be assigned to CommunicationEncodingPresent: - // // *Handshake_CommunicationEncoding CommunicationEncodingPresent isHandshake_CommunicationEncodingPresent `protobuf_oneof:"communication_encoding_present"` Seeds []byte `protobuf:"bytes,2,opt,name=seeds,proto3" json:"seeds,omitempty"` @@ -416,13 +412,11 @@ type ClientAuth struct { // [default = 0] // // Types that are valid to be assigned to NetReadTimeoutPresent: - // // *ClientAuth_NetReadTimeout NetReadTimeoutPresent isClientAuth_NetReadTimeoutPresent `protobuf_oneof:"net_read_timeout_present"` // [default = 0]; // // Types that are valid to be assigned to NetWriteTimeoutPresent: - // // *ClientAuth_NetWriteTimeout NetWriteTimeoutPresent isClientAuth_NetWriteTimeoutPresent `protobuf_oneof:"net_write_timeout_present"` Destination string `protobuf:"bytes,5,opt,name=destination,proto3" json:"destination,omitempty"` @@ -563,10 +557,9 @@ func (*ClientAuth) XXX_OneofWrappers() []interface{} { } type Ack struct { - // [default = 0] + //[default = 0] // // Types that are valid to be assigned to ErrorCodePresent: - // // *Ack_ErrorCode ErrorCodePresent isAck_ErrorCodePresent `protobuf_oneof:"error_code_present"` ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` @@ -827,27 +820,24 @@ func (m *Unsub) GetFilter() string { return "" } -// PullRequest +// PullRequest type Get struct { Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` ClientId string `protobuf:"bytes,2,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` FetchSize int32 `protobuf:"varint,3,opt,name=fetch_size,json=fetchSize,proto3" json:"fetch_size,omitempty"` - // [default = -1] + //[default = -1] // // Types that are valid to be assigned to TimeoutPresent: - // // *Get_Timeout TimeoutPresent isGet_TimeoutPresent `protobuf_oneof:"timeout_present"` - // [default = 2] + //[default = 2] // // Types that are valid to be assigned to UnitPresent: - // // *Get_Unit UnitPresent isGet_UnitPresent `protobuf_oneof:"unit_present"` - // [default = false] + //[default = false] // // Types that are valid to be assigned to AutoAckPresent: - // // *Get_AutoAck AutoAckPresent isGet_AutoAckPresent `protobuf_oneof:"auto_ack_present"` } @@ -985,6 +975,7 @@ func (*Get) XXX_OneofWrappers() []interface{} { } } +// type Messages struct { BatchId int64 `protobuf:"varint,1,opt,name=batch_id,json=batchId,proto3" json:"batch_id,omitempty"` Messages [][]byte `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` @@ -1044,7 +1035,6 @@ type Dump struct { // [default = 0] // // Types that are valid to be assigned to TimestampPresent: - // // *Dump_Timestamp TimestampPresent isDump_TimestampPresent `protobuf_oneof:"timestamp_present"` } diff --git a/proto/canal/EntryProtocol.pb.go b/proto/canal/EntryProtocol.pb.go index 489ec5c661b..cf947639d74 100644 --- a/proto/canal/EntryProtocol.pb.go +++ b/proto/canal/EntryProtocol.pb.go @@ -22,7 +22,7 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -// *打散后的事件类型,主要用于标识事务的开始,变更数据,结束* +//*打散后的事件类型,主要用于标识事务的开始,变更数据,结束* type EntryType int32 const ( @@ -61,7 +61,7 @@ func (EntryType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_237ce6ff565bd62b, []int{0} } -// * 事件类型 * +//* 事件类型 * type EventType int32 const ( @@ -132,7 +132,7 @@ func (EventType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_237ce6ff565bd62b, []int{1} } -// *数据库类型* +//*数据库类型* type Type int32 const ( @@ -164,20 +164,19 @@ func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor_237ce6ff565bd62b, []int{2} } -// *************************************************************** +//*************************************************************** // message model -// 如果要在Enum中新增类型,确保以前的类型的下标值不变. -// ************************************************************** +//如果要在Enum中新增类型,确保以前的类型的下标值不变. +//************************************************************** type Entry struct { - // *协议头部信息* + //*协议头部信息* Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - // /**打散后的事件类型**/ [default = ROWDATA] + ///**打散后的事件类型**/ [default = ROWDATA] // // Types that are valid to be assigned to EntryTypePresent: - // // *Entry_EntryType EntryTypePresent isEntry_EntryTypePresent `protobuf_oneof:"entryType_present"` - // *传输的二进制数组* + //*传输的二进制数组* StoreValue []byte `protobuf:"bytes,3,opt,name=storeValue,proto3" json:"storeValue,omitempty"` } @@ -261,45 +260,42 @@ func (*Entry) XXX_OneofWrappers() []interface{} { } } -// *message Header* +//*message Header* type Header struct { - // [default = 1] + //[default = 1] // // Types that are valid to be assigned to VersionPresent: - // // *Header_Version VersionPresent isHeader_VersionPresent `protobuf_oneof:"version_present"` - // *binlog/redolog 文件名* + //*binlog/redolog 文件名* LogfileName string `protobuf:"bytes,2,opt,name=logfileName,proto3" json:"logfileName,omitempty"` - // *binlog/redolog 文件的偏移位置* + //*binlog/redolog 文件的偏移位置* LogfileOffset int64 `protobuf:"varint,3,opt,name=logfileOffset,proto3" json:"logfileOffset,omitempty"` - // *服务端serverId* + //*服务端serverId* ServerId int64 `protobuf:"varint,4,opt,name=serverId,proto3" json:"serverId,omitempty"` - // * 变更数据的编码 * + //* 变更数据的编码 * ServerenCode string `protobuf:"bytes,5,opt,name=serverenCode,proto3" json:"serverenCode,omitempty"` - // *变更数据的执行时间 * + //*变更数据的执行时间 * ExecuteTime int64 `protobuf:"varint,6,opt,name=executeTime,proto3" json:"executeTime,omitempty"` - // [default = MYSQL] + //[default = MYSQL] // // Types that are valid to be assigned to SourceTypePresent: - // // *Header_SourceType SourceTypePresent isHeader_SourceTypePresent `protobuf_oneof:"sourceType_present"` - // * 变更数据的schemaname* + //* 变更数据的schemaname* SchemaName string `protobuf:"bytes,8,opt,name=schemaName,proto3" json:"schemaName,omitempty"` - // *变更数据的tablename* + //*变更数据的tablename* TableName string `protobuf:"bytes,9,opt,name=tableName,proto3" json:"tableName,omitempty"` - // *每个event的长度* + //*每个event的长度* EventLength int64 `protobuf:"varint,10,opt,name=eventLength,proto3" json:"eventLength,omitempty"` // [default = UPDATE] // // Types that are valid to be assigned to EventTypePresent: - // // *Header_EventType EventTypePresent isHeader_EventTypePresent `protobuf_oneof:"eventType_present"` - // *预留扩展* + //*预留扩展* Props []*Pair `protobuf:"bytes,12,rep,name=props,proto3" json:"props,omitempty"` - // *当前事务的gitd* + //*当前事务的gitd* Gtid string `protobuf:"bytes,13,opt,name=gtid,proto3" json:"gtid,omitempty"` } @@ -485,31 +481,30 @@ func (*Header) XXX_OneofWrappers() []interface{} { } } -// *每个字段的数据结构* +//*每个字段的数据结构* type Column struct { - // *字段下标* + //*字段下标* Index int32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` - // *字段java中类型* + //*字段java中类型* SqlType int32 `protobuf:"varint,2,opt,name=sqlType,proto3" json:"sqlType,omitempty"` - // *字段名称(忽略大小写),在mysql中是没有的* + //*字段名称(忽略大小写),在mysql中是没有的* Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - // *是否是主键* + //*是否是主键* IsKey bool `protobuf:"varint,4,opt,name=isKey,proto3" json:"isKey,omitempty"` - // *如果EventType=UPDATE,用于标识这个字段值是否有修改* + //*如果EventType=UPDATE,用于标识这个字段值是否有修改* Updated bool `protobuf:"varint,5,opt,name=updated,proto3" json:"updated,omitempty"` - // [default = false] + //[default = false] // // Types that are valid to be assigned to IsNullPresent: - // // *Column_IsNull IsNullPresent isColumn_IsNullPresent `protobuf_oneof:"isNull_present"` - // *预留扩展* + //*预留扩展* Props []*Pair `protobuf:"bytes,7,rep,name=props,proto3" json:"props,omitempty"` - // * 字段值,timestamp,Datetime是一个时间格式的文本 * + //* 字段值,timestamp,Datetime是一个时间格式的文本 * Value string `protobuf:"bytes,8,opt,name=value,proto3" json:"value,omitempty"` - // * 对应数据对象原始长度 * + //* 对应数据对象原始长度 * Length int32 `protobuf:"varint,9,opt,name=length,proto3" json:"length,omitempty"` - // *字段mysql类型* + //*字段mysql类型* MysqlType string `protobuf:"bytes,10,opt,name=mysqlType,proto3" json:"mysqlType,omitempty"` } @@ -643,11 +638,11 @@ func (*Column) XXX_OneofWrappers() []interface{} { } type RowData struct { - // * 字段信息,增量数据(修改前,删除前) * + //* 字段信息,增量数据(修改前,删除前) * BeforeColumns []*Column `protobuf:"bytes,1,rep,name=beforeColumns,proto3" json:"beforeColumns,omitempty"` - // * 字段信息,增量数据(修改后,新增后) * + //* 字段信息,增量数据(修改后,新增后) * AfterColumns []*Column `protobuf:"bytes,2,rep,name=afterColumns,proto3" json:"afterColumns,omitempty"` - // *预留扩展* + //*预留扩展* Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"` } @@ -705,29 +700,27 @@ func (m *RowData) GetProps() []*Pair { return nil } -// *message row 每行变更数据的数据结构* +//*message row 每行变更数据的数据结构* type RowChange struct { - // *tableId,由数据库产生* + //*tableId,由数据库产生* TableId int64 `protobuf:"varint,1,opt,name=tableId,proto3" json:"tableId,omitempty"` - // [default = UPDATE] + //[default = UPDATE] // // Types that are valid to be assigned to EventTypePresent: - // // *RowChange_EventType EventTypePresent isRowChange_EventTypePresent `protobuf_oneof:"eventType_present"` // [default = false] // // Types that are valid to be assigned to IsDdlPresent: - // // *RowChange_IsDdl IsDdlPresent isRowChange_IsDdlPresent `protobuf_oneof:"isDdl_present"` - // * ddl/query的sql语句 * + //* ddl/query的sql语句 * Sql string `protobuf:"bytes,11,opt,name=sql,proto3" json:"sql,omitempty"` - // * 一次数据库变更可能存在多行 * + //* 一次数据库变更可能存在多行 * RowDatas []*RowData `protobuf:"bytes,12,rep,name=rowDatas,proto3" json:"rowDatas,omitempty"` - // *预留扩展* + //*预留扩展* Props []*Pair `protobuf:"bytes,13,rep,name=props,proto3" json:"props,omitempty"` - // * ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName * + //* ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName * DdlSchemaName string `protobuf:"bytes,14,opt,name=ddlSchemaName,proto3" json:"ddlSchemaName,omitempty"` } @@ -855,15 +848,15 @@ func (*RowChange) XXX_OneofWrappers() []interface{} { } } -// *开始事务的一些信息* +//*开始事务的一些信息* type TransactionBegin struct { - // *已废弃,请使用header里的executeTime* + //*已废弃,请使用header里的executeTime* ExecuteTime int64 `protobuf:"varint,1,opt,name=executeTime,proto3" json:"executeTime,omitempty"` - // *已废弃,Begin里不提供事务id* + //*已废弃,Begin里不提供事务id* TransactionId string `protobuf:"bytes,2,opt,name=transactionId,proto3" json:"transactionId,omitempty"` - // *预留扩展* + //*预留扩展* Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"` - // *执行的thread Id* + //*执行的thread Id* ThreadId int64 `protobuf:"varint,4,opt,name=threadId,proto3" json:"threadId,omitempty"` } @@ -928,13 +921,13 @@ func (m *TransactionBegin) GetThreadId() int64 { return 0 } -// *结束事务的一些信息* +//*结束事务的一些信息* type TransactionEnd struct { - // *已废弃,请使用header里的executeTime* + //*已废弃,请使用header里的executeTime* ExecuteTime int64 `protobuf:"varint,1,opt,name=executeTime,proto3" json:"executeTime,omitempty"` - // *事务号* + //*事务号* TransactionId string `protobuf:"bytes,2,opt,name=transactionId,proto3" json:"transactionId,omitempty"` - // *预留扩展* + //*预留扩展* Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"` } @@ -992,7 +985,7 @@ func (m *TransactionEnd) GetProps() []*Pair { return nil } -// *预留扩展* +//*预留扩展* type Pair struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` From e817b1db311f307a0adcfd3c04c96a3b47d2df59 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 17 Jan 2023 16:32:53 +0800 Subject: [PATCH 2/3] revert unnecessary changes --- proto/canal/CanalProtocol.pb.go | 30 ++++++--- proto/canal/EntryProtocol.pb.go | 113 +++++++++++++++++--------------- 2 files changed, 80 insertions(+), 63 deletions(-) diff --git a/proto/canal/CanalProtocol.pb.go b/proto/canal/CanalProtocol.pb.go index c7ca2d31422..ac10ea042f0 100644 --- a/proto/canal/CanalProtocol.pb.go +++ b/proto/canal/CanalProtocol.pb.go @@ -118,20 +118,23 @@ func (PacketType) EnumDescriptor() ([]byte, []int) { } type Packet struct { - //[default = 17]; + // [default = 17]; // // Types that are valid to be assigned to MagicNumberPresent: + // // *Packet_MagicNumber MagicNumberPresent isPacket_MagicNumberPresent `protobuf_oneof:"magic_number_present"` - //[default = 1]; + // [default = 1]; // // Types that are valid to be assigned to VersionPresent: + // // *Packet_Version VersionPresent isPacket_VersionPresent `protobuf_oneof:"version_present"` Type PacketType `protobuf:"varint,3,opt,name=type,proto3,enum=com.alibaba.otter.canal.protocol.PacketType" json:"type,omitempty"` - //[default = NONE]; + // [default = NONE]; // // Types that are valid to be assigned to CompressionPresent: + // // *Packet_Compression CompressionPresent isPacket_CompressionPresent `protobuf_oneof:"compression_present"` Body []byte `protobuf:"bytes,5,opt,name=body,proto3" json:"body,omitempty"` @@ -316,9 +319,10 @@ func (m *HeartBeat) GetStartTimestamp() int64 { } type Handshake struct { - // [default = "utf8"]; + // [default = "utf8"]; // // Types that are valid to be assigned to CommunicationEncodingPresent: + // // *Handshake_CommunicationEncoding CommunicationEncodingPresent isHandshake_CommunicationEncodingPresent `protobuf_oneof:"communication_encoding_present"` Seeds []byte `protobuf:"bytes,2,opt,name=seeds,proto3" json:"seeds,omitempty"` @@ -412,11 +416,13 @@ type ClientAuth struct { // [default = 0] // // Types that are valid to be assigned to NetReadTimeoutPresent: + // // *ClientAuth_NetReadTimeout NetReadTimeoutPresent isClientAuth_NetReadTimeoutPresent `protobuf_oneof:"net_read_timeout_present"` // [default = 0]; // // Types that are valid to be assigned to NetWriteTimeoutPresent: + // // *ClientAuth_NetWriteTimeout NetWriteTimeoutPresent isClientAuth_NetWriteTimeoutPresent `protobuf_oneof:"net_write_timeout_present"` Destination string `protobuf:"bytes,5,opt,name=destination,proto3" json:"destination,omitempty"` @@ -557,9 +563,10 @@ func (*ClientAuth) XXX_OneofWrappers() []interface{} { } type Ack struct { - //[default = 0] + // [default = 0] // // Types that are valid to be assigned to ErrorCodePresent: + // // *Ack_ErrorCode ErrorCodePresent isAck_ErrorCodePresent `protobuf_oneof:"error_code_present"` ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` @@ -820,24 +827,27 @@ func (m *Unsub) GetFilter() string { return "" } -// PullRequest +// PullRequest type Get struct { Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` ClientId string `protobuf:"bytes,2,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` FetchSize int32 `protobuf:"varint,3,opt,name=fetch_size,json=fetchSize,proto3" json:"fetch_size,omitempty"` - //[default = -1] + // [default = -1] // // Types that are valid to be assigned to TimeoutPresent: + // // *Get_Timeout TimeoutPresent isGet_TimeoutPresent `protobuf_oneof:"timeout_present"` - //[default = 2] + // [default = 2] // // Types that are valid to be assigned to UnitPresent: + // // *Get_Unit UnitPresent isGet_UnitPresent `protobuf_oneof:"unit_present"` - //[default = false] + // [default = false] // // Types that are valid to be assigned to AutoAckPresent: + // // *Get_AutoAck AutoAckPresent isGet_AutoAckPresent `protobuf_oneof:"auto_ack_present"` } @@ -975,7 +985,6 @@ func (*Get) XXX_OneofWrappers() []interface{} { } } -// type Messages struct { BatchId int64 `protobuf:"varint,1,opt,name=batch_id,json=batchId,proto3" json:"batch_id,omitempty"` Messages [][]byte `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` @@ -1035,6 +1044,7 @@ type Dump struct { // [default = 0] // // Types that are valid to be assigned to TimestampPresent: + // // *Dump_Timestamp TimestampPresent isDump_TimestampPresent `protobuf_oneof:"timestamp_present"` } diff --git a/proto/canal/EntryProtocol.pb.go b/proto/canal/EntryProtocol.pb.go index cf947639d74..489ec5c661b 100644 --- a/proto/canal/EntryProtocol.pb.go +++ b/proto/canal/EntryProtocol.pb.go @@ -22,7 +22,7 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -//*打散后的事件类型,主要用于标识事务的开始,变更数据,结束* +// *打散后的事件类型,主要用于标识事务的开始,变更数据,结束* type EntryType int32 const ( @@ -61,7 +61,7 @@ func (EntryType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_237ce6ff565bd62b, []int{0} } -//* 事件类型 * +// * 事件类型 * type EventType int32 const ( @@ -132,7 +132,7 @@ func (EventType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_237ce6ff565bd62b, []int{1} } -//*数据库类型* +// *数据库类型* type Type int32 const ( @@ -164,19 +164,20 @@ func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor_237ce6ff565bd62b, []int{2} } -//*************************************************************** +// *************************************************************** // message model -//如果要在Enum中新增类型,确保以前的类型的下标值不变. -//************************************************************** +// 如果要在Enum中新增类型,确保以前的类型的下标值不变. +// ************************************************************** type Entry struct { - //*协议头部信息* + // *协议头部信息* Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - ///**打散后的事件类型**/ [default = ROWDATA] + // /**打散后的事件类型**/ [default = ROWDATA] // // Types that are valid to be assigned to EntryTypePresent: + // // *Entry_EntryType EntryTypePresent isEntry_EntryTypePresent `protobuf_oneof:"entryType_present"` - //*传输的二进制数组* + // *传输的二进制数组* StoreValue []byte `protobuf:"bytes,3,opt,name=storeValue,proto3" json:"storeValue,omitempty"` } @@ -260,42 +261,45 @@ func (*Entry) XXX_OneofWrappers() []interface{} { } } -//*message Header* +// *message Header* type Header struct { - //[default = 1] + // [default = 1] // // Types that are valid to be assigned to VersionPresent: + // // *Header_Version VersionPresent isHeader_VersionPresent `protobuf_oneof:"version_present"` - //*binlog/redolog 文件名* + // *binlog/redolog 文件名* LogfileName string `protobuf:"bytes,2,opt,name=logfileName,proto3" json:"logfileName,omitempty"` - //*binlog/redolog 文件的偏移位置* + // *binlog/redolog 文件的偏移位置* LogfileOffset int64 `protobuf:"varint,3,opt,name=logfileOffset,proto3" json:"logfileOffset,omitempty"` - //*服务端serverId* + // *服务端serverId* ServerId int64 `protobuf:"varint,4,opt,name=serverId,proto3" json:"serverId,omitempty"` - //* 变更数据的编码 * + // * 变更数据的编码 * ServerenCode string `protobuf:"bytes,5,opt,name=serverenCode,proto3" json:"serverenCode,omitempty"` - //*变更数据的执行时间 * + // *变更数据的执行时间 * ExecuteTime int64 `protobuf:"varint,6,opt,name=executeTime,proto3" json:"executeTime,omitempty"` - //[default = MYSQL] + // [default = MYSQL] // // Types that are valid to be assigned to SourceTypePresent: + // // *Header_SourceType SourceTypePresent isHeader_SourceTypePresent `protobuf_oneof:"sourceType_present"` - //* 变更数据的schemaname* + // * 变更数据的schemaname* SchemaName string `protobuf:"bytes,8,opt,name=schemaName,proto3" json:"schemaName,omitempty"` - //*变更数据的tablename* + // *变更数据的tablename* TableName string `protobuf:"bytes,9,opt,name=tableName,proto3" json:"tableName,omitempty"` - //*每个event的长度* + // *每个event的长度* EventLength int64 `protobuf:"varint,10,opt,name=eventLength,proto3" json:"eventLength,omitempty"` // [default = UPDATE] // // Types that are valid to be assigned to EventTypePresent: + // // *Header_EventType EventTypePresent isHeader_EventTypePresent `protobuf_oneof:"eventType_present"` - //*预留扩展* + // *预留扩展* Props []*Pair `protobuf:"bytes,12,rep,name=props,proto3" json:"props,omitempty"` - //*当前事务的gitd* + // *当前事务的gitd* Gtid string `protobuf:"bytes,13,opt,name=gtid,proto3" json:"gtid,omitempty"` } @@ -481,30 +485,31 @@ func (*Header) XXX_OneofWrappers() []interface{} { } } -//*每个字段的数据结构* +// *每个字段的数据结构* type Column struct { - //*字段下标* + // *字段下标* Index int32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` - //*字段java中类型* + // *字段java中类型* SqlType int32 `protobuf:"varint,2,opt,name=sqlType,proto3" json:"sqlType,omitempty"` - //*字段名称(忽略大小写),在mysql中是没有的* + // *字段名称(忽略大小写),在mysql中是没有的* Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - //*是否是主键* + // *是否是主键* IsKey bool `protobuf:"varint,4,opt,name=isKey,proto3" json:"isKey,omitempty"` - //*如果EventType=UPDATE,用于标识这个字段值是否有修改* + // *如果EventType=UPDATE,用于标识这个字段值是否有修改* Updated bool `protobuf:"varint,5,opt,name=updated,proto3" json:"updated,omitempty"` - //[default = false] + // [default = false] // // Types that are valid to be assigned to IsNullPresent: + // // *Column_IsNull IsNullPresent isColumn_IsNullPresent `protobuf_oneof:"isNull_present"` - //*预留扩展* + // *预留扩展* Props []*Pair `protobuf:"bytes,7,rep,name=props,proto3" json:"props,omitempty"` - //* 字段值,timestamp,Datetime是一个时间格式的文本 * + // * 字段值,timestamp,Datetime是一个时间格式的文本 * Value string `protobuf:"bytes,8,opt,name=value,proto3" json:"value,omitempty"` - //* 对应数据对象原始长度 * + // * 对应数据对象原始长度 * Length int32 `protobuf:"varint,9,opt,name=length,proto3" json:"length,omitempty"` - //*字段mysql类型* + // *字段mysql类型* MysqlType string `protobuf:"bytes,10,opt,name=mysqlType,proto3" json:"mysqlType,omitempty"` } @@ -638,11 +643,11 @@ func (*Column) XXX_OneofWrappers() []interface{} { } type RowData struct { - //* 字段信息,增量数据(修改前,删除前) * + // * 字段信息,增量数据(修改前,删除前) * BeforeColumns []*Column `protobuf:"bytes,1,rep,name=beforeColumns,proto3" json:"beforeColumns,omitempty"` - //* 字段信息,增量数据(修改后,新增后) * + // * 字段信息,增量数据(修改后,新增后) * AfterColumns []*Column `protobuf:"bytes,2,rep,name=afterColumns,proto3" json:"afterColumns,omitempty"` - //*预留扩展* + // *预留扩展* Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"` } @@ -700,27 +705,29 @@ func (m *RowData) GetProps() []*Pair { return nil } -//*message row 每行变更数据的数据结构* +// *message row 每行变更数据的数据结构* type RowChange struct { - //*tableId,由数据库产生* + // *tableId,由数据库产生* TableId int64 `protobuf:"varint,1,opt,name=tableId,proto3" json:"tableId,omitempty"` - //[default = UPDATE] + // [default = UPDATE] // // Types that are valid to be assigned to EventTypePresent: + // // *RowChange_EventType EventTypePresent isRowChange_EventTypePresent `protobuf_oneof:"eventType_present"` // [default = false] // // Types that are valid to be assigned to IsDdlPresent: + // // *RowChange_IsDdl IsDdlPresent isRowChange_IsDdlPresent `protobuf_oneof:"isDdl_present"` - //* ddl/query的sql语句 * + // * ddl/query的sql语句 * Sql string `protobuf:"bytes,11,opt,name=sql,proto3" json:"sql,omitempty"` - //* 一次数据库变更可能存在多行 * + // * 一次数据库变更可能存在多行 * RowDatas []*RowData `protobuf:"bytes,12,rep,name=rowDatas,proto3" json:"rowDatas,omitempty"` - //*预留扩展* + // *预留扩展* Props []*Pair `protobuf:"bytes,13,rep,name=props,proto3" json:"props,omitempty"` - //* ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName * + // * ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName * DdlSchemaName string `protobuf:"bytes,14,opt,name=ddlSchemaName,proto3" json:"ddlSchemaName,omitempty"` } @@ -848,15 +855,15 @@ func (*RowChange) XXX_OneofWrappers() []interface{} { } } -//*开始事务的一些信息* +// *开始事务的一些信息* type TransactionBegin struct { - //*已废弃,请使用header里的executeTime* + // *已废弃,请使用header里的executeTime* ExecuteTime int64 `protobuf:"varint,1,opt,name=executeTime,proto3" json:"executeTime,omitempty"` - //*已废弃,Begin里不提供事务id* + // *已废弃,Begin里不提供事务id* TransactionId string `protobuf:"bytes,2,opt,name=transactionId,proto3" json:"transactionId,omitempty"` - //*预留扩展* + // *预留扩展* Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"` - //*执行的thread Id* + // *执行的thread Id* ThreadId int64 `protobuf:"varint,4,opt,name=threadId,proto3" json:"threadId,omitempty"` } @@ -921,13 +928,13 @@ func (m *TransactionBegin) GetThreadId() int64 { return 0 } -//*结束事务的一些信息* +// *结束事务的一些信息* type TransactionEnd struct { - //*已废弃,请使用header里的executeTime* + // *已废弃,请使用header里的executeTime* ExecuteTime int64 `protobuf:"varint,1,opt,name=executeTime,proto3" json:"executeTime,omitempty"` - //*事务号* + // *事务号* TransactionId string `protobuf:"bytes,2,opt,name=transactionId,proto3" json:"transactionId,omitempty"` - //*预留扩展* + // *预留扩展* Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"` } @@ -985,7 +992,7 @@ func (m *TransactionEnd) GetProps() []*Pair { return nil } -//*预留扩展* +// *预留扩展* type Pair struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` From c9aa51ce064d9d59f9633c4e03cd0d54fcdee6be Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 17 Jan 2023 17:22:06 +0800 Subject: [PATCH 3/3] fix test --- cdc/sink/mysql/mysql.go | 4 ++-- cdc/sink/mysql/mysql_test.go | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 8f0f90cb553..cea6122cafc 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -341,14 +341,14 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) return nil } - if err != nil { + if err != nil && errorutil.IsRetryableDDLError(err) { log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err)) } return err }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDDLMaxRetry), - retry.WithIsRetryableErr(cerror.IsRetryableError)) + retry.WithIsRetryableErr(errorutil.IsRetryableDDLError)) } func (s *mysqlSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 36192db27b1..fc428662220 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1642,19 +1642,21 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { mock.ExpectCommit() mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int"). WillReturnError(&dmysql.MySQLError{ Number: uint16(infoschema.ErrColumnExists.Code()), }) - mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int"). - WillReturnError(&dmysql.MySQLError{ - Number: uint16(infoschema.ErrColumnExists.Code()), - }) - mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21'"). + mock.ExpectRollback() + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')"). WillReturnError(&dmysql.MySQLError{ Number: mysql.ErrPartitionMgmtOnNonpartitioned, }) mock.ExpectRollback() + mock.ExpectClose() return db, nil } @@ -1705,10 +1707,10 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { CommitTs: 1030, TableInfo: &model.SimpleTableInfo{ Schema: "test", - Table: "t2", + Table: "t1", }, Type: timodel.ActionAddTablePartition, - Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21'", + Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')", } err = sink.EmitDDLEvent(ctx, ddl1) require.Nil(t, err) @@ -1719,7 +1721,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { require.Nil(t, err) err = sink.EmitDDLEvent(ctx, ddl3) - require.Nil(t, cerror.IsChangefeedUnRetryableError(err)) + require.True(t, cerror.IsChangefeedUnRetryableError(err)) err = sink.Close(ctx) require.Nil(t, err)