diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go index 85fc99b1bde..f520ab979dd 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go @@ -16,9 +16,11 @@ package mysql import ( "context" "database/sql" + "fmt" "net/url" "time" + cerrors "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/parser/model" @@ -189,7 +191,7 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { zap.Duration("duration", time.Since(start)), zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) } log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query), diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index ae1454ed09a..4f6696819c7 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -835,7 +835,7 @@ func logDMLTxnErr( zap.String("query", query), zap.Int("count", count), zap.String("changefeed", changefeed)) } - return err + return errors.WithMessage(err, fmt.Sprintf("Failed query info: %s; ", query)) } func isRetryableDMLError(err error) bool { diff --git a/cdc/syncpointstore/mysql_syncpoint_store.go b/cdc/syncpointstore/mysql_syncpoint_store.go index 1e081eb1279..0542b0aadde 100644 --- a/cdc/syncpointstore/mysql_syncpoint_store.go +++ b/cdc/syncpointstore/mysql_syncpoint_store.go @@ -86,7 +86,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("create sync table: begin Tx fail", zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "create sync table: begin Tx fail;")) } _, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database) if err != nil { @@ -94,7 +94,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } _, err = tx.Exec("USE " + database) if err != nil { @@ -102,7 +102,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } query := `CREATE TABLE IF NOT EXISTS %s ( @@ -121,10 +121,10 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } err = tx.Commit() - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, @@ -134,7 +134,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("sync table: begin Tx fail", zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;")) } row := tx.QueryRow("select @@tidb_current_ts") var secondaryTs string @@ -145,7 +145,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } // insert ts map query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable + @@ -156,7 +156,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } // set global tidb_external_ts to secondary ts @@ -172,7 +172,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } } @@ -197,7 +197,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, } err = tx.Commit() - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } func (s *mysqlSyncPointStore) Close() error {