Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fast fail on some error in ddl execution #317

Merged
merged 7 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 5 additions & 20 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/backoff"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
)

Expand Down Expand Up @@ -231,38 +232,22 @@ func (tsc *realTaskStatusChecker) run() {
// isResumableError checks the error message and returns whether we need to
// resume the task and retry
func isResumableError(err *pb.ProcessError) bool {
// not elegant code, because TiDB doesn't expose some error
unsupportedDDLMsgs := []string{
"can't drop column with index",
"unsupported add column",
"unsupported modify column",
"unsupported modify",
"unsupported drop integer primary key",
}
unsupportedDMLMsgs := []string{
"Error 1062: Duplicate entry",
"Error 1406: Data too long for column",
}
parseRelayLogErrMsg := []string{
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}

switch err.Type {
case pb.ErrorType_ExecSQL:
for _, msg := range unsupportedDDLMsgs {
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if err.Error != nil && strings.Contains(err.Error.RawCause, msg) {
return false
}
}
for _, msg := range unsupportedDMLMsgs {
for _, msg := range retry.UnsupportedDMLMsgs {
if err.Error != nil && strings.Contains(err.Error.RawCause, msg) {
return false
}
}
case pb.ErrorType_UnknownError:
if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range parseRelayLogErrMsg {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(err.Error.Message, msg) {
return false
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,38 @@
package retry

import (
"strings"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
)

var (
// UnsupportedDDLMsgs list the error messages of some unsupported DDL in TiDB
UnsupportedDDLMsgs = []string{
"can't drop column with index",
"unsupported add column",
"unsupported modify column",
"unsupported modify charset",
"unsupported modify collate",
"unsupported drop integer primary key",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
UnsupportedDMLMsgs = []string{
"Error 1062: Duplicate entry",
"Error 1406: Data too long for column",
}

// ParseRelayLogErrMsgs list the error messages of some un-recoverable relay log parsing error, which is used in task auto recovery.
ParseRelayLogErrMsgs = []string{
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}
)

// IsRetryableError tells whether this error should retry
func IsRetryableError(err error) bool {
err = errors.Cause(err) // check the original error
Expand All @@ -35,3 +61,18 @@ func IsRetryableError(err error) bool {
}
return false
}

// IsRetryableErrorFastFailFilter tells whether this error should retry,
// filtering some incompatible DDL error to achieve fast fail.
func IsRetryableErrorFastFailFilter(err error) bool {
err2 := errors.Cause(err) // check the original error
if mysqlErr, ok := err2.(*mysql.MySQLError); ok && mysqlErr.Number == tmysql.ErrUnknown {
for _, msg := range UnsupportedDDLMsgs {
if strings.Contains(mysqlErr.Message, msg) {
return false
}
}
}

return IsRetryableError(err)
}
2 changes: 1 addition & 1 deletion syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(
FirstRetryDuration: retryTimeout,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
if retry.IsRetryableError(err) {
if retry.IsRetryableErrorFastFailFilter(err) {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
tctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)))
Expand Down