From 8510d3ad044c218229081d626855fc0220cfb22d Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 17 Oct 2019 11:46:22 +0800 Subject: [PATCH 1/4] syncer: fast fail on some error in ddl execution --- dm/worker/task_checker.go | 24 +++++------------------ pkg/retry/errors.go | 40 +++++++++++++++++++++++++++++++++++++++ syncer/db.go | 2 +- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 462fc2e645..f2420c57a2 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/backoff" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" ) @@ -231,32 +232,17 @@ func (tsc *realTaskStatusChecker) run() { // isResumableError checks the error message and returns whether we need to // resume the task and retry func isResumableError(err *pb.ProcessError) bool { - // not elegant code, because TiDB doesn't expose some error - unsupportedDDLMsgs := []string{ - "can't drop column with index", - "unsupported add column", - "unsupported modify column", - "unsupported modify", - "unsupported drop integer primary key", - } - unsupportedDMLMsgs := []string{ - "Error 1062: Duplicate entry", - "Error 1406: Data too long for column", - } - parseRelayLogErrMsg := []string{ - "binlog checksum mismatch, data may be corrupted", - "get event err EOF", - } parseRelayLogCode := fmt.Sprintf("code=%d", terror.ErrParserParseRelayLog.Code()) switch err.Type { case pb.ErrorType_ExecSQL: - for _, msg := range unsupportedDDLMsgs { + // not elegant code, because TiDB doesn't expose some error + for _, msg := range retry.UnsupportedDDLMsgs { if strings.Contains(err.Msg, msg) { return false } } - for _, msg := range unsupportedDMLMsgs { + for _, msg := range retry.UnsupportedDMLMsgs { if strings.Contains(err.Msg, msg) { return false } @@ -264,7 +250,7 @@ func isResumableError(err *pb.ProcessError) bool { case pb.ErrorType_UnknownError: // TODO: we need better mechanism to convert error in `ProcessError` to `terror.Error` if strings.Contains(err.Msg, parseRelayLogCode) { - for _, msg := range parseRelayLogErrMsg { + for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(err.Msg, msg) { return false } diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index fb2e10a2f8..5c2eaea8e1 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -14,12 +14,37 @@ package retry import ( + "strings" + "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" tmysql "github.com/pingcap/parser/mysql" gmysql "github.com/siddontang/go-mysql/mysql" ) +var ( + // UnsupportedDDLMsgs list the error messages of some unsupported DDL in TiDB + UnsupportedDDLMsgs = []string{ + "can't drop column with index", + "unsupported add column", + "unsupported modify column", + "unsupported modify", + "unsupported drop integer primary key", + } + + // UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery + UnsupportedDMLMsgs = []string{ + "Error 1062: Duplicate entry", + "Error 1406: Data too long for column", + } + + // ParseRelayLogErrMsgs list the error messages of some un-recoverable relay log parsing error, which is used in task auto recovery. + ParseRelayLogErrMsgs = []string{ + "binlog checksum mismatch, data may be corrupted", + "get event err EOF", + } +) + // IsRetryableError tells whether this error should retry func IsRetryableError(err error) bool { err = errors.Cause(err) // check the original error @@ -35,3 +60,18 @@ func IsRetryableError(err error) bool { } return false } + +// IsRetryableErrorFastFailFilter tells whether this error should retry, +// filtering some incompatible DDL error to achieve fast fail. +func IsRetryableErrorFastFailFilter(err error) bool { + err2 := errors.Cause(err) // check the original error + if mysqlErr, ok := err2.(*mysql.MySQLError); ok && mysqlErr.Number == tmysql.ErrUnknown { + for _, msg := range UnsupportedDDLMsgs { + if strings.Contains(mysqlErr.Message, msg) { + return false + } + } + } + + return IsRetryableError(err) +} diff --git a/syncer/db.go b/syncer/db.go index 6e2114080c..4146f5f9bf 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -159,7 +159,7 @@ func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func( FirstRetryDuration: retryTimeout, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { - if retry.IsRetryableError(err) { + if retry.IsRetryableErrorFastFailFilter(err) { tctx.L().Warn("execute statements", zap.Int("retry", retryTime), zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1))) From 61d5be612e84aec7ada061a736083845a13bffd7 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 17 Oct 2019 14:11:49 +0800 Subject: [PATCH 2/4] address comment --- pkg/retry/errors.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index 5c2eaea8e1..ce8464652b 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -28,7 +28,8 @@ var ( "can't drop column with index", "unsupported add column", "unsupported modify column", - "unsupported modify", + "unsupported modify charset", + "unsupported modify collate", "unsupported drop integer primary key", } From d89c96aabc6a6454aed670209ea7d944a32a7723 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 17 Oct 2019 15:10:02 +0800 Subject: [PATCH 3/4] fix merge conflict --- dm/worker/task_checker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 0c93ff0cb9..ec4b23c65e 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -232,8 +232,6 @@ func (tsc *realTaskStatusChecker) run() { // isResumableError checks the error message and returns whether we need to // resume the task and retry func isResumableError(err *pb.ProcessError) bool { - parseRelayLogCode := fmt.Sprintf("code=%d", terror.ErrParserParseRelayLog.Code()) - switch err.Type { case pb.ErrorType_ExecSQL: // not elegant code, because TiDB doesn't expose some error From 96619a822dbd47ec954c889717c3c1395077b87e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 17 Oct 2019 15:12:35 +0800 Subject: [PATCH 4/4] fix merge error --- dm/worker/task_checker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index ec4b23c65e..c7187f000e 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -247,7 +247,7 @@ func isResumableError(err *pb.ProcessError) bool { } case pb.ErrorType_UnknownError: if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { - for _, msg := range parseRelayLogErrMsg { + for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(err.Error.Message, msg) { return false }