Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error,sink (ticdc): add mysql error black list #8090

Merged
merged 3 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
}
s.statistics.AddDDLCount()
err := s.execDDLWithMaxRetries(ctx, ddl)
if !errorutil.IsRetryableDDLError(err) {
return cerror.WrapChangefeedUnretryableErr(err)
}
return errors.Trace(err)
}

Expand All @@ -338,14 +341,14 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve
log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err))
return nil
}
if err != nil {
if err != nil && errorutil.IsRetryableDDLError(err) {
log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err))
}
return err
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithBackoffMaxDelay(backoffMaxDelayInMs),
retry.WithMaxTries(defaultDDLMaxRetry),
retry.WithIsRetryableErr(cerror.IsRetryableError))
retry.WithIsRetryableErr(errorutil.IsRetryableDDLError))
}

func (s *mysqlSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
Expand Down
24 changes: 23 additions & 1 deletion cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1642,11 +1642,21 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
mock.ExpectCommit()
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))

mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").
WillReturnError(&dmysql.MySQLError{
Number: uint16(infoschema.ErrColumnExists.Code()),
})
mock.ExpectRollback()

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')").
WillReturnError(&dmysql.MySQLError{
Number: mysql.ErrPartitionMgmtOnNonpartitioned,
})
mock.ExpectRollback()

mock.ExpectClose()
return db, nil
}
Expand Down Expand Up @@ -1692,7 +1702,16 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
}

ddl3 := &model.DDLEvent{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.SimpleTableInfo{
Schema: "test",
Table: "t1",
},
Type: timodel.ActionAddTablePartition,
Query: "ALTER TABLE test.t1 ADD PARTITION (PARTITION `p20230120` VALUES LESS THAN '2023-01-21')",
}
err = sink.EmitDDLEvent(ctx, ddl1)
require.Nil(t, err)
err = sink.EmitDDLEvent(ctx, ddl2)
Expand All @@ -1701,6 +1720,9 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
err = sink.EmitDDLEvent(ctx, ddl1)
require.Nil(t, err)

err = sink.EmitDDLEvent(ctx, ddl3)
require.True(t, cerror.IsChangefeedUnRetryableError(err))

err = sink.Close(ctx)
require.Nil(t, err)
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/dbutil"
dmretry "github.com/pingcap/tiflow/dm/pkg/retry"
cerror "github.com/pingcap/tiflow/pkg/errors"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
)

Expand Down Expand Up @@ -70,3 +73,43 @@ func IsRetryableEtcdError(err error) bool {
return false
}
}

// IsRetryableDMLError check if the error is a retryable dml error.
func IsRetryableDMLError(err error) bool {
if !cerror.IsRetryableError(err) {
return false
}
// Check if the error is connection errors that can retry safely.
if dmretry.IsConnectionError(err) {
return true
}
// Check if the error is a retriable TiDB error or MySQL error.
return dbutil.IsRetryableError(err)
}

// IsRetryableDDLError check if the error is a retryable ddl error.
func IsRetryableDDLError(err error) bool {
if IsRetryableDMLError(err) {
return true
}
err = errors.Cause(err)
mysqlErr, ok := err.(*dmysql.MySQLError)
if !ok {
return false
}
// If the error is in the black list, return false.
switch mysqlErr.Number {
case mysql.ErrAccessDenied,
mysql.ErrDBaccessDenied,
mysql.ErrSyntax,
mysql.ErrParse,
mysql.ErrNoDB,
mysql.ErrNoSuchTable,
mysql.ErrNoSuchIndex,
mysql.ErrKeyColumnDoesNotExits,
mysql.ErrWrongColumnName,
mysql.ErrPartitionMgmtOnNonpartitioned:
return false
}
return true
}