From eeaf582d88d27bd7de441d0f624e94e7fed69254 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 11 Nov 2019 17:08:50 +0800 Subject: [PATCH 1/9] initial commit --- pkg/conn/baseconn.go | 3 +++ pkg/utils/util.go | 7 +++++++ syncer/db.go | 10 ++++++---- syncer/syncer.go | 10 ++++++++-- syncer/warning.go | 12 +++++++----- 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index d9f9ac0e10..b8677420ec 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -143,6 +143,9 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr log.ShortError(err)) continue } + if utils.IsContextCanceledError(err) { + return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1)) + } tctx.L().Error("execute statement failed", zap.String("query", utils.TruncateString(query, -1)), diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 59fe2b56bb..507addf1c3 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -14,12 +14,14 @@ package utils import ( + "context" "math" "os" "strconv" "strings" "time" + "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" "github.com/pingcap/dm/pkg/terror" @@ -90,3 +92,8 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool { return false } + +// IsContextCanceledError checks whether err is context.Canceled +func IsContextCanceledError(err error) bool { + return errors.Cause(err) == context.Canceled +} diff --git a/syncer/db.go b/syncer/db.go index 7031366913..d61e7bec45 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -266,10 +266,12 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun }) if err != nil { - tctx.L().Error("execute statements failed after retry", - zap.String("queries", utils.TruncateInterface(queries, -1)), - zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + if !utils.IsContextCanceledError(err) { + tctx.L().Error("execute statements failed after retry", + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + } return ret.(int), err } return ret.(int), nil diff --git a/syncer/syncer.go b/syncer/syncer.go index 3b1b328e66..275dded172 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -568,7 +568,9 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { } cancel() // cancel s.Run syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc() - errs = append(errs, err) + if err != nil { + errs = append(errs, err) + } } }() @@ -917,7 +919,11 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, s.jobWg.Done() if err != nil { s.execErrorDetected.Set(true) - s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + if utils.IsContextCanceledError(err) { + s.runFatalChan <- nil + } else { + s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + } continue } s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls))) diff --git a/syncer/warning.go b/syncer/warning.go index 484a2fa952..308bffc855 100644 --- a/syncer/warning.go +++ b/syncer/warning.go @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} { errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors)) for _, ctx := range s.execErrors.errors { - errors = append(errors, &pb.SyncSQLError{ - Msg: ctx.err.Error(), - FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos), - ErrorSQL: ctx.jobs, - }) + if utils.IsContextCanceledError(ctx.err) { + errors = append(errors, &pb.SyncSQLError{ + Msg: ctx.err.Error(), + FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos), + ErrorSQL: ctx.jobs, + }) + } } return &pb.SyncError{Errors: errors} From 80c1abfb72a7a33acdcf3b6d27abbde75ebd0146 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 14 Nov 2019 11:50:46 +0800 Subject: [PATCH 2/9] add filter in loader --- loader/db.go | 20 ++++++++++++-------- loader/loader.go | 12 +++++++++--- pkg/conn/baseconn.go | 10 ++++++---- syncer/db.go | 10 ++++++---- syncer/warning.go | 2 +- 5 files changed, 34 insertions(+), 20 deletions(-) diff --git a/loader/db.go b/loader/db.go index 989b457ca1..a0f1907cb1 100644 --- a/loader/db.go +++ b/loader/db.go @@ -95,10 +95,12 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf return ret, err }) if err != nil { - ctx.L().Error("query statement failed after retry", - zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + if !utils.IsContextCanceledError(err) { + ctx.L().Error("query statement failed after retry", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + } return nil, err } return ret.(*sql.Rows), nil @@ -169,10 +171,12 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... }) if err != nil { - ctx.L().Error("execute statements failed after retry", - zap.String("queries", utils.TruncateInterface(queries, -1)), - zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + if !utils.IsContextCanceledError(err) { + ctx.L().Error("execute statements failed after retry", + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + } } return err diff --git a/loader/loader.go b/loader/loader.go index c82298ee33..56407d370e 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -39,7 +39,7 @@ import ( "github.com/pingcap/failpoint" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" - "github.com/pingcap/tidb-tools/pkg/table-router" + router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/siddontang/go/sync2" "go.uber.org/zap" ) @@ -152,7 +152,11 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * if err := w.conn.executeSQL(ctctx, sqls); err != nil { // expect pause rather than exit err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream) - runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + if utils.IsContextCanceledError(err) { + runFatalChan <- nil + } else { + runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) + } return } w.loader.finishedDataSize.Add(job.offset - job.lastOffset) @@ -443,7 +447,9 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) { defer wg.Done() for err := range l.runFatalChan { cancel() // cancel l.Restore - errs = append(errs, err) + if err != nil { + errs = append(errs, err) + } } }() diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index b8677420ec..858f17f135 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -95,10 +95,12 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...) if err != nil { - tctx.L().Error("query statement failed", - zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + if !utils.IsContextCanceledError(err) { + tctx.L().Error("query statement failed", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + } return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1)) } return rows, nil diff --git a/syncer/db.go b/syncer/db.go index d61e7bec45..c2430314c4 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -203,10 +203,12 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter ) if err != nil { - tctx.L().Error("query statement failed after retry", - zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + if !utils.IsContextCanceledError(err) { + tctx.L().Error("query statement failed after retry", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) + } return nil, err } return ret.(*sql.Rows), nil diff --git a/syncer/warning.go b/syncer/warning.go index 308bffc855..4a95ee1d81 100644 --- a/syncer/warning.go +++ b/syncer/warning.go @@ -41,7 +41,7 @@ func (s *Syncer) Error() interface{} { errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors)) for _, ctx := range s.execErrors.errors { - if utils.IsContextCanceledError(ctx.err) { + if !utils.IsContextCanceledError(ctx.err) { errors = append(errors, &pb.SyncSQLError{ Msg: ctx.err.Error(), FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos), From e4ffa212947c8a47949d4b21edb15b50ff4bff26 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 25 Nov 2019 13:00:44 +0800 Subject: [PATCH 3/9] add filter to logger --- loader/db.go | 20 ++++++++------------ pkg/conn/baseconn.go | 13 ++++--------- syncer/db.go | 20 ++++++++------------ 3 files changed, 20 insertions(+), 33 deletions(-) diff --git a/loader/db.go b/loader/db.go index a0f1907cb1..989b457ca1 100644 --- a/loader/db.go +++ b/loader/db.go @@ -95,12 +95,10 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf return ret, err }) if err != nil { - if !utils.IsContextCanceledError(err) { - ctx.L().Error("query statement failed after retry", - zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) - } + ctx.L().Error("query statement failed after retry", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) return nil, err } return ret.(*sql.Rows), nil @@ -171,12 +169,10 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... }) if err != nil { - if !utils.IsContextCanceledError(err) { - ctx.L().Error("execute statements failed after retry", - zap.String("queries", utils.TruncateInterface(queries, -1)), - zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) - } + ctx.L().Error("execute statements failed after retry", + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) } return err diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index 858f17f135..d9f9ac0e10 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -95,12 +95,10 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...) if err != nil { - if !utils.IsContextCanceledError(err) { - tctx.L().Error("query statement failed", - zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) - } + tctx.L().Error("query statement failed", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1)) } return rows, nil @@ -145,9 +143,6 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr log.ShortError(err)) continue } - if utils.IsContextCanceledError(err) { - return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1)) - } tctx.L().Error("execute statement failed", zap.String("query", utils.TruncateString(query, -1)), diff --git a/syncer/db.go b/syncer/db.go index c2430314c4..7031366913 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -203,12 +203,10 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter ) if err != nil { - if !utils.IsContextCanceledError(err) { - tctx.L().Error("query statement failed after retry", - zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) - } + tctx.L().Error("query statement failed after retry", + zap.String("query", utils.TruncateString(query, -1)), + zap.String("argument", utils.TruncateInterface(args, -1)), + log.ShortError(err)) return nil, err } return ret.(*sql.Rows), nil @@ -268,12 +266,10 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun }) if err != nil { - if !utils.IsContextCanceledError(err) { - tctx.L().Error("execute statements failed after retry", - zap.String("queries", utils.TruncateInterface(queries, -1)), - zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) - } + tctx.L().Error("execute statements failed after retry", + zap.String("queries", utils.TruncateInterface(queries, -1)), + zap.String("arguments", utils.TruncateInterface(args, -1)), + log.ShortError(err)) return ret.(int), err } return ret.(int), nil From 40e2155af0fe830e0e9efa811f477d01d1ce538f Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 25 Nov 2019 14:37:47 +0800 Subject: [PATCH 4/9] skip cancel error in context --- pkg/log/log.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/log/log.go b/pkg/log/log.go index 3a3e8c722a..45c77668e4 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/pkg/helper" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) const ( @@ -130,7 +131,7 @@ func SetLevel(level zapcore.Level) zapcore.Level { // just repeats known information. You should almost always use `ShortError` // instead of `zap.Error`, unless the error is no longer propagated upwards. func ShortError(err error) zap.Field { - if err == nil { + if err == nil || utils.IsContextCanceledError(err) { return zap.Skip() } return zap.String("error", err.Error()) From 8622f3d6b080b2948047f46f2597994982250588 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 25 Nov 2019 14:42:49 +0800 Subject: [PATCH 5/9] fix error --- pkg/log/log.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/log/log.go b/pkg/log/log.go index 45c77668e4..15b0c62d62 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -14,8 +14,10 @@ package log import ( + "context" "fmt" + "github.com/pingcap/errors" pclog "github.com/pingcap/log" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -23,7 +25,6 @@ import ( "github.com/pingcap/dm/pkg/helper" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" ) const ( @@ -131,7 +132,7 @@ func SetLevel(level zapcore.Level) zapcore.Level { // just repeats known information. You should almost always use `ShortError` // instead of `zap.Error`, unless the error is no longer propagated upwards. func ShortError(err error) zap.Field { - if err == nil || utils.IsContextCanceledError(err) { + if err == nil || errors.Cause(err) == context.Canceled { return zap.Skip() } return zap.String("error", err.Error()) From b30b083bc77459a555f427e6fadf20234eb7adb8 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 27 Nov 2019 15:08:36 +0800 Subject: [PATCH 6/9] remove adding nil error to runFatalChan --- loader/loader.go | 8 ++------ syncer/syncer.go | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 905a81bd04..a2167f4fcb 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -174,9 +174,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh if err != nil { // expect pause rather than exit err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream) - if utils.IsContextCanceledError(err) { - runFatalChan <- nil - } else { + if !utils.IsContextCanceledError(err) { runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) } return @@ -479,9 +477,7 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) { defer wg.Done() for err := range l.runFatalChan { cancel() // cancel l.Restore - if err != nil { - errs = append(errs, err) - } + errs = append(errs, err) } }() diff --git a/syncer/syncer.go b/syncer/syncer.go index bfa7cd5e6d..b8c3c28e6b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -572,9 +572,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { } cancel() // cancel s.Run syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc() - if err != nil { - errs = append(errs, err) - } + errs = append(errs, err) } }() @@ -923,9 +921,7 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn, s.jobWg.Done() if err != nil { s.execErrorDetected.Set(true) - if utils.IsContextCanceledError(err) { - s.runFatalChan <- nil - } else { + if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) } continue From 67c12d4bc6c068125b7757c462bed18cd407b88c Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 28 Nov 2019 10:40:49 +0800 Subject: [PATCH 7/9] add filter errors --- loader/db.go | 4 ++-- pkg/conn/baseconn.go | 6 +++--- pkg/log/log.go | 8 ++++++++ syncer/db.go | 4 ++-- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/loader/db.go b/loader/db.go index 989b457ca1..a8a7cb7db2 100644 --- a/loader/db.go +++ b/loader/db.go @@ -98,7 +98,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf ctx.L().Error("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + log.FilterError(err)) return nil, err } return ret.(*sql.Rows), nil @@ -172,7 +172,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... ctx.L().Error("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + log.FilterError(err)) } return err diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index d9f9ac0e10..789c8a034d 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -98,7 +98,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int tctx.L().Error("query statement failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + log.FilterError(err)) return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1)) } return rows, nil @@ -146,14 +146,14 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr tctx.L().Error("execute statement failed", zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err)) + zap.String("argument", utils.TruncateInterface(arg, -1)), log.FilterError(err)) rerr := txn.Rollback() if rerr != nil { tctx.L().Error("rollback failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(arg, -1)), - log.ShortError(rerr)) + log.FilterError(rerr)) } // we should return the exec err, instead of the rollback rerr. return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1)) diff --git a/pkg/log/log.go b/pkg/log/log.go index 15b0c62d62..714648d543 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -132,6 +132,14 @@ func SetLevel(level zapcore.Level) zapcore.Level { // just repeats known information. You should almost always use `ShortError` // instead of `zap.Error`, unless the error is no longer propagated upwards. func ShortError(err error) zap.Field { + if err == nil { + return zap.Skip() + } + return zap.String("error", err.Error()) +} + +// FilterError is a variant of ShortError which will skip some specified errors. e.g.:context.Canceled +func FilterError(err error) zap.Field { if err == nil || errors.Cause(err) == context.Canceled { return zap.Skip() } diff --git a/syncer/db.go b/syncer/db.go index 7031366913..6ead4fede3 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -206,7 +206,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter tctx.L().Error("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + log.FilterError(err)) return nil, err } return ret.(*sql.Rows), nil @@ -269,7 +269,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun tctx.L().Error("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(err)) + log.FilterError(err)) return ret.(int), err } return ret.(int), nil From ca7ba3e283bf1cd1d404a1f0e10a458cbfc759f8 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 28 Nov 2019 11:02:33 +0800 Subject: [PATCH 8/9] add filterCancelError to filter cancel error --- loader/db.go | 4 ++-- pkg/conn/baseconn.go | 6 +++--- pkg/log/log.go | 10 +++++----- syncer/db.go | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/loader/db.go b/loader/db.go index a8a7cb7db2..8320b74df0 100644 --- a/loader/db.go +++ b/loader/db.go @@ -98,7 +98,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf ctx.L().Error("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.FilterError(err)) + log.ShortError(log.FilterCancelError(err))) return nil, err } return ret.(*sql.Rows), nil @@ -172,7 +172,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... ctx.L().Error("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), - log.FilterError(err)) + log.ShortError(log.FilterCancelError(err))) } return err diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index 789c8a034d..fc3217db1f 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -98,7 +98,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int tctx.L().Error("query statement failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.FilterError(err)) + log.ShortError(log.FilterCancelError(err))) return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1)) } return rows, nil @@ -146,14 +146,14 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr tctx.L().Error("execute statement failed", zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(arg, -1)), log.FilterError(err)) + zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(log.FilterCancelError(err))) rerr := txn.Rollback() if rerr != nil { tctx.L().Error("rollback failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(arg, -1)), - log.FilterError(rerr)) + log.ShortError(rerr)) } // we should return the exec err, instead of the rollback rerr. return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1)) diff --git a/pkg/log/log.go b/pkg/log/log.go index 714648d543..f4aad8f8f9 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -138,12 +138,12 @@ func ShortError(err error) zap.Field { return zap.String("error", err.Error()) } -// FilterError is a variant of ShortError which will skip some specified errors. e.g.:context.Canceled -func FilterError(err error) zap.Field { - if err == nil || errors.Cause(err) == context.Canceled { - return zap.Skip() +// FilterCancelError will skip context.Canceled error +func FilterCancelError(err error) error { + if errors.Cause(err) == context.Canceled { + return nil } - return zap.String("error", err.Error()) + return err } // L returns the current logger for DM. diff --git a/syncer/db.go b/syncer/db.go index 6ead4fede3..a115cf0b50 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -206,7 +206,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter tctx.L().Error("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.FilterError(err)) + log.ShortError(log.FilterCancelError(err))) return nil, err } return ret.(*sql.Rows), nil @@ -269,7 +269,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun tctx.L().Error("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), - log.FilterError(err)) + log.ShortError(log.FilterCancelError(err))) return ret.(int), err } return ret.(int), nil From f302f0b81947b517a617a3180d263cc441d0bf04 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 28 Nov 2019 14:17:02 +0800 Subject: [PATCH 9/9] add ErrorFilterContextCanceled to skip error log of context.Canceled --- loader/db.go | 8 ++++---- pkg/conn/baseconn.go | 8 ++++---- pkg/log/log.go | 19 ++++++++++--------- syncer/db.go | 8 ++++---- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/loader/db.go b/loader/db.go index 8320b74df0..95ecca0037 100644 --- a/loader/db.go +++ b/loader/db.go @@ -95,10 +95,10 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf return ret, err }) if err != nil { - ctx.L().Error("query statement failed after retry", + ctx.L().ErrorFilterContextCanceled("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(log.FilterCancelError(err))) + log.ShortError(err)) return nil, err } return ret.(*sql.Rows), nil @@ -169,10 +169,10 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... }) if err != nil { - ctx.L().Error("execute statements failed after retry", + ctx.L().ErrorFilterContextCanceled("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(log.FilterCancelError(err))) + log.ShortError(err)) } return err diff --git a/pkg/conn/baseconn.go b/pkg/conn/baseconn.go index fc3217db1f..d969717dea 100644 --- a/pkg/conn/baseconn.go +++ b/pkg/conn/baseconn.go @@ -95,10 +95,10 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...) if err != nil { - tctx.L().Error("query statement failed", + tctx.L().ErrorFilterContextCanceled("query statement failed", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(log.FilterCancelError(err))) + log.ShortError(err)) return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1)) } return rows, nil @@ -144,9 +144,9 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr continue } - tctx.L().Error("execute statement failed", + tctx.L().ErrorFilterContextCanceled("execute statement failed", zap.String("query", utils.TruncateString(query, -1)), - zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(log.FilterCancelError(err))) + zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err)) rerr := txn.Rollback() if rerr != nil { diff --git a/pkg/log/log.go b/pkg/log/log.go index f4aad8f8f9..3a1c7de212 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -17,7 +17,6 @@ import ( "context" "fmt" - "github.com/pingcap/errors" pclog "github.com/pingcap/log" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -74,6 +73,16 @@ func (l Logger) WithFields(fields ...zap.Field) Logger { return Logger{l.With(fields...)} } +// ErrorFilterContextCanceled wraps Logger.Error() and will filter error log when error is context.Canceled +func (l Logger) ErrorFilterContextCanceled(msg string, fields ...zap.Field) { + for _, field := range fields { + if field.Key == "error" && field.String == context.Canceled.Error() { + return + } + } + l.Logger.Error(msg, fields...) +} + // logger for DM var ( appLogger = Logger{zap.NewNop()} @@ -138,14 +147,6 @@ func ShortError(err error) zap.Field { return zap.String("error", err.Error()) } -// FilterCancelError will skip context.Canceled error -func FilterCancelError(err error) error { - if errors.Cause(err) == context.Canceled { - return nil - } - return err -} - // L returns the current logger for DM. func L() Logger { return appLogger diff --git a/syncer/db.go b/syncer/db.go index a115cf0b50..8af616954e 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -203,10 +203,10 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter ) if err != nil { - tctx.L().Error("query statement failed after retry", + tctx.L().ErrorFilterContextCanceled("query statement failed after retry", zap.String("query", utils.TruncateString(query, -1)), zap.String("argument", utils.TruncateInterface(args, -1)), - log.ShortError(log.FilterCancelError(err))) + log.ShortError(err)) return nil, err } return ret.(*sql.Rows), nil @@ -266,10 +266,10 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun }) if err != nil { - tctx.L().Error("execute statements failed after retry", + tctx.L().ErrorFilterContextCanceled("execute statements failed after retry", zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)), - log.ShortError(log.FilterCancelError(err))) + log.ShortError(err)) return ret.(int), err } return ret.(int), nil