From 0da4d3ec2d62ee881ec55df6b0edd0aba08e98a3 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 23 Apr 2024 16:09:10 +0800 Subject: [PATCH] This is an automated cherry-pick of #10945 Signed-off-by: ti-chi-bot --- cdc/sink/mysql/mysql_syncpoint_store.go | 20 ++++++++++---------- cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go | 7 ++++++- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 2 +- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cdc/sink/mysql/mysql_syncpoint_store.go b/cdc/sink/mysql/mysql_syncpoint_store.go index 714ace4af42..26cf286cde4 100644 --- a/cdc/sink/mysql/mysql_syncpoint_store.go +++ b/cdc/sink/mysql/mysql_syncpoint_store.go @@ -156,7 +156,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("create sync table: begin Tx fail", zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "create sync table: begin Tx fail;")) } _, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database) if err != nil { @@ -164,7 +164,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } _, err = tx.Exec("USE " + database) if err != nil { @@ -172,7 +172,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } query := `CREATE TABLE IF NOT EXISTS %s ( @@ -191,10 +191,10 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } err = tx.Commit() - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, @@ -204,7 +204,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("sync table: begin Tx fail", zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;")) } row := tx.QueryRow("select @@tidb_current_ts") var secondaryTs string @@ -215,7 +215,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } // insert ts map query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable + @@ -226,7 +226,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } // set global tidb_external_ts to secondary ts @@ -242,7 +242,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } } @@ -267,7 +267,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, } err = tx.Commit() - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } func (s *mysqlSyncPointStore) Close() error { diff --git a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go index ac8072611f4..53d7656e260 100644 --- a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go @@ -16,10 +16,15 @@ package mysql import ( "context" "database/sql" + "fmt" "net/url" "time" +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go "github.com/pingcap/errors" +======= + cerrors "github.com/pingcap/errors" +>>>>>>> cd065d3f56 (log(ticdc): Add more error query information to the returned error to facilitate users to know the cause of the failure (#10945)):cdc/sink/ddlsink/mysql/mysql_ddl_sink.go "github.com/pingcap/failpoint" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/parser/model" @@ -189,7 +194,7 @@ func (m *mysqlDDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error zap.Duration("duration", time.Since(start)), zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) } log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query), diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index 157a3576e83..3389a62edd4 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -772,7 +772,7 @@ func logDMLTxnErr( zap.String("query", query), zap.Int("count", count), zap.String("changefeed", changefeed)) } - return err + return errors.WithMessage(err, fmt.Sprintf("Failed query info: %s; ", query)) } func isRetryableDMLError(err error) bool {