diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 8f0f90cb553..cea6122cafc 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -341,14 +341,14 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) return nil } - if err != nil { + if err != nil && errorutil.IsRetryableDDLError(err) { log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err)) } return err }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDDLMaxRetry), - retry.WithIsRetryableErr(cerror.IsRetryableError)) + retry.WithIsRetryableErr(errorutil.IsRetryableDDLError)) } func (s *mysqlSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 36192db27b1..fc428662220 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1642,19 +1642,21 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { mock.ExpectCommit() mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int"). WillReturnError(&dmysql.MySQLError{ Number: uint16(infoschema.ErrColumnExists.Code()), }) - mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int"). - WillReturnError(&dmysql.MySQLError{ - Number: uint16(infoschema.ErrColumnExists.Code()), - }) - mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21'"). + mock.ExpectRollback() + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')"). WillReturnError(&dmysql.MySQLError{ Number: mysql.ErrPartitionMgmtOnNonpartitioned, }) mock.ExpectRollback() + mock.ExpectClose() return db, nil } @@ -1705,10 +1707,10 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { CommitTs: 1030, TableInfo: &model.SimpleTableInfo{ Schema: "test", - Table: "t2", + Table: "t1", }, Type: timodel.ActionAddTablePartition, - Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21'", + Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')", } err = sink.EmitDDLEvent(ctx, ddl1) require.Nil(t, err) @@ -1719,7 +1721,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { require.Nil(t, err) err = sink.EmitDDLEvent(ctx, ddl3) - require.Nil(t, cerror.IsChangefeedUnRetryableError(err)) + require.True(t, cerror.IsChangefeedUnRetryableError(err)) err = sink.Close(ctx) require.Nil(t, err)