diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go new file mode 100644 index 00000000000..7a605a54893 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go @@ -0,0 +1,226 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "net/url" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errorutil" + "github.com/pingcap/tiflow/pkg/quotes" + "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/sink" + pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" + "go.uber.org/zap" +) + +const ( + defaultDDLMaxRetry uint64 = 20 + + // networkDriftDuration is used to construct a context timeout for database operations. + networkDriftDuration = 5 * time.Second +) + +// Assert Sink implementation +var _ ddlsink.Sink = (*DDLSink)(nil) + +// DDLSink is a sink that writes DDL events to MySQL. +type DDLSink struct { + // id indicates which processor (changefeed) this sink belongs to. + id model.ChangeFeedID + // db is the database connection. + db *sql.DB + cfg *pmysql.Config + // statistics is the statistics of this sink. + // We use it to record the DDL count. + statistics *metrics.Statistics +} + +// NewDDLSink creates a new DDLSink. +func NewDDLSink( + ctx context.Context, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + dbConnFactory pmysql.Factory, +) (*DDLSink, error) { + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) + cfg := pmysql.NewConfig() + err := cfg.Apply(ctx, changefeedID, sinkURI, replicaConfig) + if err != nil { + return nil, err + } + + dsnStr, err := pmysql.GenerateDSN(ctx, sinkURI, cfg, dbConnFactory) + if err != nil { + return nil, err + } + + db, err := dbConnFactory(ctx, dsnStr) + if err != nil { + return nil, err + } + + m := &DDLSink{ + id: changefeedID, + db: db, + cfg: cfg, + statistics: metrics.NewStatistics(ctx, sink.TxnSink), + } + + log.Info("MySQL DDL sink is created", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID)) + return m, nil +} + +// WriteDDLEvent writes a DDL event to the mysql database. +func (m *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + err := m.execDDLWithMaxRetries(ctx, ddl) + // we should not retry changefeed if DDL failed by return an unretryable error. + if !errorutil.IsRetryableDDLError(err) { + return cerror.WrapChangefeedUnretryableErr(err) + } + return errors.Trace(err) +} + +func (m *DDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { + return retry.Do(ctx, func() error { + err := m.statistics.RecordDDLExecution(func() error { return m.execDDL(ctx, ddl) }) + if err != nil { + if errorutil.IsIgnorableMySQLDDLError(err) { + // NOTE: don't change the log, some tests depend on it. + log.Info("Execute DDL failed, but error can be ignored", + zap.Uint64("startTs", ddl.StartTs), zap.String("ddl", ddl.Query), + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(err)) + // If the error is ignorable, we will direly ignore the error. + return nil + } + log.Warn("Execute DDL with error, retry later", + zap.Uint64("startTs", ddl.StartTs), zap.String("ddl", ddl.Query), + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(err)) + return err + } + return nil + }, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()), + retry.WithBackoffMaxDelay(pmysql.BackoffMaxDelay.Milliseconds()), + retry.WithMaxTries(defaultDDLMaxRetry), + retry.WithIsRetryableErr(errorutil.IsRetryableDDLError)) +} + +func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { + writeTimeout, _ := time.ParseDuration(m.cfg.WriteTimeout) + writeTimeout += networkDriftDuration + ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout) + defer cancelFunc() + + shouldSwitchDB := needSwitchDB(ddl) + + failpoint.Inject("MySQLSinkExecDDLDelay", func() { + select { + case <-ctx.Done(): + failpoint.Return(ctx.Err()) + case <-time.After(time.Hour): + } + failpoint.Return(nil) + }) + + start := time.Now() + log.Info("Start exec DDL", zap.Any("DDL", ddl), zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID)) + tx, err := m.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + if shouldSwitchDB { + _, err = tx.ExecContext(ctx, "USE "+quotes.QuoteName(ddl.TableInfo.TableName.Schema)+";") + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Error("Failed to rollback", zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), zap.Error(err)) + } + return err + } + } + + if _, err = tx.ExecContext(ctx, ddl.Query); err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Error("Failed to rollback", zap.String("sql", ddl.Query), + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), zap.Error(err)) + } + return err + } + + if err = tx.Commit(); err != nil { + log.Error("Failed to exec DDL", zap.String("sql", ddl.Query), + zap.Duration("duration", time.Since(start)), + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), zap.Error(err)) + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + + log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query), + zap.Duration("duration", time.Since(start)), + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID)) + return nil +} + +func needSwitchDB(ddl *model.DDLEvent) bool { + if len(ddl.TableInfo.TableName.Schema) == 0 { + return false + } + if ddl.Type == timodel.ActionCreateSchema || ddl.Type == timodel.ActionDropSchema { + return false + } + return true +} + +// WriteCheckpointTs does nothing. +func (m *DDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error { + // Only for RowSink for now. + return nil +} + +// Close closes the database connection. +func (m *DDLSink) Close() { + if m.statistics != nil { + m.statistics.Close() + } + if m.db != nil { + if err := m.db.Close(); err != nil { + log.Warn("MySQL ddl sink close db wit error", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(err)) + } + } +} diff --git a/pkg/errorutil/util_test.go b/pkg/errorutil/util_test.go new file mode 100644 index 00000000000..8f3d4e9a2bf --- /dev/null +++ b/pkg/errorutil/util_test.go @@ -0,0 +1,130 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + "errors" + "testing" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb/infoschema" + tmysql "github.com/pingcap/tidb/parser/mysql" + "github.com/stretchr/testify/require" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/raft/v3" +) + +func newMysqlErr(number uint16, message string) *mysql.MySQLError { + return &mysql.MySQLError{ + Number: number, + Message: message, + } +} + +func TestIgnoreMysqlDDLError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true}, + {newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true}, + {newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + } +} + +func TestIsRetryableEtcdError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {nil, false}, + {v3rpc.ErrCorrupt, false}, + + {v3rpc.ErrGRPCTimeoutDueToConnectionLost, true}, + {v3rpc.ErrTimeoutDueToLeaderFail, true}, + {v3rpc.ErrNoSpace, true}, + {raft.ErrStopped, true}, + {errors.New("rpc error: code = Unavailable desc = closing transport due to: " + + "connection error: desc = \\\"error reading from server: EOF\\\", " + + "received prior goaway: code: NO_ERROR\""), true}, + {errors.New("rpc error: code = Unavailable desc = error reading from server: " + + "xxx: read: connection reset by peer"), true}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsRetryableEtcdError(item.err)) + } +} + +func TestIsRetryableDMLError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {nil, false}, + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), false}, + {tmysql.ErrBadConn, true}, + {newMysqlErr(tmysql.ErrLockWaitTimeout, "Lock wait timeout exceeded"), false}, + {newMysqlErr(tmysql.ErrLockDeadlock, "Deadlock found when trying to get lock"), true}, + } + + for _, c := range cases { + require.Equal(t, c.ret, IsRetryableDMLError(c.err)) + } +} + +func TestIsRetryableDDLError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrNoDB, "Error: Duplicate key name 'some_key'"), false}, + {newMysqlErr(tmysql.ErrParse, "Can't create database"), false}, + {newMysqlErr(tmysql.ErrAccessDenied, "Access denied for user"), false}, + {newMysqlErr(tmysql.ErrDBaccessDenied, "Access denied for db"), false}, + {newMysqlErr(tmysql.ErrNoSuchTable, "table not exist"), false}, + {newMysqlErr(tmysql.ErrNoSuchIndex, "index not exist"), false}, + {newMysqlErr(tmysql.ErrWrongColumnName, "wrong column name'"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Duplicate key name 'some_key'"), true}, + {newMysqlErr(tmysql.ErrPartitionMgmtOnNonpartitioned, "xx"), false}, + } + + for _, c := range cases { + require.Equal(t, c.ret, IsRetryableDDLError(c.err)) + } +} + +func TestIsSyncPointIgnoreError(t *testing.T) { + t.Parallel() + cases := []struct { + err error + ret bool + }{ + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), false}, + {newMysqlErr(tmysql.ErrNoDB, "Error: Duplicate key name 'some_key'"), false}, + {newMysqlErr(tmysql.ErrParse, "Can't create database"), false}, + {newMysqlErr(tmysql.ErrUnknownSystemVariable, "Unknown system variable"), true}, + } + for _, c := range cases { + require.Equal(t, c.ret, IsSyncPointIgnoreError(c.err)) + } +}