Skip to content

Commit

Permalink
error,sink (ticdc): add mysql error black list (#8090)
Browse files Browse the repository at this point in the history
ref #8087
  • Loading branch information
asddongmen authored Jan 17, 2023
1 parent 4fe2a0b commit 34aaa8b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
7 changes: 5 additions & 2 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
}
s.statistics.AddDDLCount()
err := s.execDDLWithMaxRetries(ctx, ddl)
if !errorutil.IsRetryableDDLError(err) {
return cerror.WrapChangefeedUnretryableErr(err)
}
return errors.Trace(err)
}

Expand All @@ -338,14 +341,14 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve
log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err))
return nil
}
if err != nil {
if err != nil && errorutil.IsRetryableDDLError(err) {
log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err))
}
return err
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithBackoffMaxDelay(backoffMaxDelayInMs),
retry.WithMaxTries(defaultDDLMaxRetry),
retry.WithIsRetryableErr(cerror.IsRetryableError))
retry.WithIsRetryableErr(errorutil.IsRetryableDDLError))
}

func (s *mysqlSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
Expand Down
24 changes: 23 additions & 1 deletion cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1642,11 +1642,21 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
mock.ExpectCommit()
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))

mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").
WillReturnError(&dmysql.MySQLError{
Number: uint16(infoschema.ErrColumnExists.Code()),
})
mock.ExpectRollback()

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')").
WillReturnError(&dmysql.MySQLError{
Number: mysql.ErrPartitionMgmtOnNonpartitioned,
})
mock.ExpectRollback()

mock.ExpectClose()
return db, nil
}
Expand Down Expand Up @@ -1692,7 +1702,16 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
}

ddl3 := &model.DDLEvent{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.SimpleTableInfo{
Schema: "test",
Table: "t1",
},
Type: timodel.ActionAddTablePartition,
Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')",
}
err = sink.EmitDDLEvent(ctx, ddl1)
require.Nil(t, err)
err = sink.EmitDDLEvent(ctx, ddl2)
Expand All @@ -1701,6 +1720,9 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
err = sink.EmitDDLEvent(ctx, ddl1)
require.Nil(t, err)

err = sink.EmitDDLEvent(ctx, ddl3)
require.True(t, cerror.IsChangefeedUnRetryableError(err))

err = sink.Close(ctx)
require.Nil(t, err)
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/dbutil"
dmretry "github.com/pingcap/tiflow/dm/pkg/retry"
cerror "github.com/pingcap/tiflow/pkg/errors"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
)

Expand Down Expand Up @@ -70,3 +73,43 @@ func IsRetryableEtcdError(err error) bool {
return false
}
}

// IsRetryableDMLError check if the error is a retryable dml error.
func IsRetryableDMLError(err error) bool {
if !cerror.IsRetryableError(err) {
return false
}
// Check if the error is connection errors that can retry safely.
if dmretry.IsConnectionError(err) {
return true
}
// Check if the error is a retriable TiDB error or MySQL error.
return dbutil.IsRetryableError(err)
}

// IsRetryableDDLError check if the error is a retryable ddl error.
func IsRetryableDDLError(err error) bool {
if IsRetryableDMLError(err) {
return true
}
err = errors.Cause(err)
mysqlErr, ok := err.(*dmysql.MySQLError)
if !ok {
return false
}
// If the error is in the black list, return false.
switch mysqlErr.Number {
case mysql.ErrAccessDenied,
mysql.ErrDBaccessDenied,
mysql.ErrSyntax,
mysql.ErrParse,
mysql.ErrNoDB,
mysql.ErrNoSuchTable,
mysql.ErrNoSuchIndex,
mysql.ErrKeyColumnDoesNotExits,
mysql.ErrWrongColumnName,
mysql.ErrPartitionMgmtOnNonpartitioned:
return false
}
return true
}

0 comments on commit 34aaa8b

Please sign in to comment.