Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader/syncer: filter context cancel error while executing sqls (#355) #382

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
return ret, err
})
if err != nil {
ctx.L().Error("query statement failed after retry",
ctx.L().ErrorFilterContextCanceled("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -169,7 +169,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
})

if err != nil {
ctx.L().Error("execute statements failed after retry",
ctx.L().ErrorFilterContextCanceled("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down
6 changes: 4 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -174,7 +174,9 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...)

if err != nil {
tctx.L().Error("query statement failed",
tctx.L().ErrorFilterContextCanceled("query statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -144,7 +144,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
continue
}

tctx.L().Error("execute statement failed",
tctx.L().ErrorFilterContextCanceled("execute statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))

Expand Down
21 changes: 21 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
package log

import (
"context"
"fmt"
"strings"

"github.com/pingcap/errors"
pclog "github.com/pingcap/log"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,6 +75,24 @@ func (l Logger) WithFields(fields ...zap.Field) Logger {
return Logger{l.With(fields...)}
}

// ErrorFilterContextCanceled wraps Logger.Error() and will filter error log when error is context.Canceled
func (l Logger) ErrorFilterContextCanceled(msg string, fields ...zap.Field) {
for _, field := range fields {
switch field.Type {
case zapcore.StringType:
if field.Key == "error" && strings.Contains(field.String, context.Canceled.Error()) {
return
}
case zapcore.ErrorType:
err, ok := field.Interface.(error)
if ok && errors.Cause(err) == context.Canceled {
return
}
}
}
l.Logger.Error(msg, fields...)
}

// logger for DM
var (
appLogger = Logger{zap.NewNop()}
Expand Down
67 changes: 67 additions & 0 deletions pkg/log/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package log

import (
"context"
"testing"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
)

func TestLog(t *testing.T) {
TestingT(t)
}

type testLogSuite struct{}

var _ = Suite(&testLogSuite{})

func (s *testLogSuite) TestTestLogger(c *C) {
logger, buffer := makeTestLogger()
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(
buffer.Stripped(), Equals,
`{"$lvl":"WARN","$msg":"the message","number":123456,"array":[7,8,9]}`,
)
buffer.Reset()
logger.ErrorFilterContextCanceled("the message", zap.Int("number", 123456),
zap.Ints("array", []int{7, 8, 9}), zap.Error(context.Canceled))
c.Assert(buffer.Stripped(), Equals, "")
buffer.Reset()
logger.ErrorFilterContextCanceled("the message", zap.Int("number", 123456),
zap.Ints("array", []int{7, 8, 9}), ShortError(errors.Annotate(context.Canceled, "extra info")))
c.Assert(buffer.Stripped(), Equals, "")
}

// makeTestLogger creates a Logger instance which produces JSON logs.
func makeTestLogger() (Logger, *zaptest.Buffer) {
buffer := new(zaptest.Buffer)
logger := zap.New(zapcore.NewCore(
zapcore.NewJSONEncoder(zapcore.EncoderConfig{
LevelKey: "$lvl",
MessageKey: "$msg",
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}),
buffer,
zap.DebugLevel,
))
return Logger{Logger: logger}, buffer
}
7 changes: 7 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package utils

import (
"context"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -152,6 +154,11 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
return false
}

// IsContextCanceledError checks whether err is context.Canceled
func IsContextCanceledError(err error) bool {
return errors.Cause(err) == context.Canceled
}

// IsBuildInSkipDDL return true when checked sql that will be skipped for syncer
func IsBuildInSkipDDL(sql string) bool {
return builtInSkipDDLPatterns.FindStringIndex(sql) != nil
Expand Down
4 changes: 2 additions & 2 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
)

if err != nil {
tctx.L().Error("query statement failed after retry",
tctx.L().ErrorFilterContextCanceled("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -266,7 +266,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
})

if err != nil {
tctx.L().Error("execute statements failed after retry",
tctx.L().ErrorFilterContextCanceled("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down
4 changes: 3 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,9 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn,
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
Expand Down
12 changes: 7 additions & 5 deletions syncer/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} {

errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors))
for _, ctx := range s.execErrors.errors {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
if !utils.IsContextCanceledError(ctx.err) {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
}
}

return &pb.SyncError{Errors: errors}
Expand Down