Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader/syncer: filter context cancel error while executing sqls #355

Merged
merged 16 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
ctx.L().Error("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
log.ShortError(log.FilterCancelError(err)))
return nil, err
}
return ret.(*sql.Rows), nil
Expand Down Expand Up @@ -172,7 +172,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
ctx.L().Error("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
log.ShortError(log.FilterCancelError(err)))
}

return err
Expand Down
6 changes: 4 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -174,7 +174,9 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
tctx.L().Error("query statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
log.ShortError(log.FilterCancelError(err)))
return nil, terror.ErrDBQueryFailed.Delegate(err, utils.TruncateString(query, -1))
}
return rows, nil
Expand Down Expand Up @@ -146,7 +146,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr

tctx.L().Error("execute statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(log.FilterCancelError(err)))

rerr := txn.Rollback()
if rerr != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package log

import (
"context"
"fmt"

"github.com/pingcap/errors"
pclog "github.com/pingcap/log"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -136,6 +138,14 @@ func ShortError(err error) zap.Field {
return zap.String("error", err.Error())
}

// FilterCancelError will skip context.Canceled error
func FilterCancelError(err error) error {
if errors.Cause(err) == context.Canceled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about use IsContextCanceledError?

return nil
}
return err
}

// L returns the current logger for DM.
func L() Logger {
return appLogger
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package utils

import (
"context"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -152,6 +154,11 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
return false
}

// IsContextCanceledError checks whether err is context.Canceled
func IsContextCanceledError(err error) bool {
return errors.Cause(err) == context.Canceled
}

// IsBuildInSkipDDL return true when checked sql that will be skipped for syncer
func IsBuildInSkipDDL(sql string) bool {
return builtInSkipDDLPatterns.FindStringIndex(sql) != nil
Expand Down
4 changes: 2 additions & 2 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
tctx.L().Error("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
log.ShortError(log.FilterCancelError(err)))
return nil, err
}
return ret.(*sql.Rows), nil
Expand Down Expand Up @@ -269,7 +269,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
tctx.L().Error("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
log.ShortError(log.FilterCancelError(err)))
return ret.(int), err
}
return ret.(int), nil
Expand Down
4 changes: 3 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,9 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn,
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
Expand Down
12 changes: 7 additions & 5 deletions syncer/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} {

errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors))
for _, ctx := range s.execErrors.errors {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
if !utils.IsContextCanceledError(ctx.err) {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
}
}

return &pb.SyncError{Errors: errors}
Expand Down