From d797cf713da387f6b28c8b72bb0b77436dc2c8f6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 21 Sep 2020 14:40:12 +0800 Subject: [PATCH] tracker: get some session variables from downtream (#1032) --- _utils/terror_gen/errors_release.txt | 1 + dm/config/subtask.go | 13 ++-- dm/worker/worker.go | 2 + errors.toml | 6 ++ loader/status.go | 4 +- pkg/conn/basedb.go | 6 +- pkg/retry/errors.go | 1 + pkg/schema/tracker.go | 43 ++++++++++- pkg/schema/tracker_test.go | 103 ++++++++++++++++++++++----- pkg/terror/error_list.go | 6 +- syncer/checkpoint_test.go | 8 ++- syncer/status.go | 2 +- syncer/syncer.go | 20 +++--- syncer/syncer_test.go | 11 +++ tests/sharding/conf/dm-task.yaml | 2 - tests/sharding/run.sh | 2 + 16 files changed, 184 insertions(+), 46 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 29b8585aa9..3282e8f860 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -459,6 +459,7 @@ ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scop ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=internal:level=medium], "Message: %s is not a valid `CREATE TABLE` statement" ErrSchemaTrackerRestoreStmtFail,[code=44010:class=schema-tracker:scope=internal:level=medium], "Message: fail to restore the statement" ErrSchemaTrackerCannotDropTable,[code=44011:class=schema-tracker:scope=internal:level=high], "Message: failed to drop table for `%s`.`%s` in schema tracker" +ErrSchemaTrackerInit,[code=44012:class=schema-tracker:scope=internal:level=high], "Message: failed to create schema tracker" ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high], "Message: the scheduler has not started" ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium], "Message: the scheduler has already started" ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium], "Message: dm-worker with name %s already exists" diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 13c9cae165..edf2eda8a7 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -81,10 +81,11 @@ func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig { // DBConfig is the DB configuration. type DBConfig struct { - Host string `toml:"host" json:"host" yaml:"host"` - Port int `toml:"port" json:"port" yaml:"port"` - User string `toml:"user" json:"user" yaml:"user"` - Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy + Host string `toml:"host" json:"host" yaml:"host"` + Port int `toml:"port" json:"port" yaml:"port"` + User string `toml:"user" json:"user" yaml:"user"` + Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy + // deprecated, mysql driver could automatically fetch this value MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"` Session map[string]string `toml:"session" json:"session" yaml:"session"` @@ -121,10 +122,6 @@ func (db *DBConfig) Decode(data string) error { // Adjust adjusts the config. func (db *DBConfig) Adjust() { - if db.MaxAllowedPacket == nil { - cloneV := defaultMaxAllowedPacket - db.MaxAllowedPacket = &cloneV - } } // SubTaskConfig is the configuration for SubTask diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 2a6a80fd06..c8f2519dd6 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -410,6 +410,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, case stage, ok := <-stageCh: if !ok { closed = true + break } opType, err := w.operateSubTaskStageWithoutConfig(stage) if err != nil { @@ -422,6 +423,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, case err, ok := <-errCh: if !ok { closed = true + break } // TODO: deal with err log.L().Error("WatchSubTaskStage received an error", zap.Error(err)) diff --git a/errors.toml b/errors.toml index 13beeb19c3..11290af245 100644 --- a/errors.toml +++ b/errors.toml @@ -2764,6 +2764,12 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-schema-tracker-44012] +message = "failed to create schema tracker" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-scheduler-46001] message = "the scheduler has not started" description = "" diff --git a/loader/status.go b/loader/status.go index 53f30894b6..1cabfa3ab9 100644 --- a/loader/status.go +++ b/loader/status.go @@ -27,7 +27,7 @@ var ( printStatusInterval = time.Second * 5 ) -// Status implements SubTaskUnit.Status +// Status implements Unit.Status func (l *Loader) Status() interface{} { finishedSize := l.finishedDataSize.Get() totalSize := l.totalDataSize.Get() @@ -41,7 +41,7 @@ func (l *Loader) Status() interface{} { return s } -// Error implements SubTaskUnit.Error +// Error implements Unit.Error func (l *Loader) Error() interface{} { return &pb.LoadError{} } diff --git a/pkg/conn/basedb.go b/pkg/conn/basedb.go index d091807539..7ca372259c 100644 --- a/pkg/conn/basedb.go +++ b/pkg/conn/basedb.go @@ -57,8 +57,10 @@ var mockDB sqlmock.Sqlmock // Apply will build BaseDB with DBConfig func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) { - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=%d", - config.User, config.Password, config.Host, config.Port, *config.MaxAllowedPacket) + // maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection. + // https://github.com/go-sql-driver/mysql#maxallowedpacket + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0", + config.User, config.Password, config.Host, config.Port) doFuncInClose := func() {} if config.Security != nil && len(config.Security.SSLCA) != 0 && diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index 3bfc5f00c6..a98c693519 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -31,6 +31,7 @@ var ( "unsupported drop integer primary key", "Unsupported collation", "Invalid default value for", + "Unsupported drop primary key", } // UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery diff --git a/pkg/schema/tracker.go b/pkg/schema/tracker.go index 9cb2fbfb53..8dc3bf8e60 100644 --- a/pkg/schema/tracker.go +++ b/pkg/schema/tracker.go @@ -24,12 +24,16 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" + tidbConfig "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" + + "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" ) const ( @@ -37,6 +41,10 @@ const ( schemaLeaseTime = 10 * time.Millisecond ) +var ( + sessionVars = []string{"sql_mode", "tidb_skip_utf8_check"} +) + // Tracker is used to track schema locally. type Tracker struct { store kv.Storage @@ -44,8 +52,39 @@ type Tracker struct { se session.Session } -// NewTracker creates a new tracker. -func NewTracker(sessionCfg map[string]string) (*Tracker, error) { +// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve +// some variable from downstream TiDB using `tidbConn`. +func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker, error) { + // NOTE: tidb uses a **global** config so can't isolate tracker's config from each other. If that isolation is needed, + // we might SetGlobalConfig before every call to tracker, or use some patch like https://github.com/bouk/monkey + toSet := tidbConfig.NewConfig() + toSet.AlterPrimaryKey = true + tidbConfig.StoreGlobalConfig(toSet) + + if len(sessionCfg) == 0 { + sessionCfg = make(map[string]string) + var ignoredColumn interface{} + for _, k := range sessionVars { + rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k)) + if err2 != nil { + return nil, err2 + } + if rows.Next() { + var value string + if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil { + return nil, err3 + } + sessionCfg[k] = value + } + if err2 = rows.Close(); err2 != nil { + return nil, err2 + } + if err2 = rows.Err(); err2 != nil { + return nil, err2 + } + } + } + store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.MockTiKV)) if err != nil { return nil, err diff --git a/pkg/schema/tracker_test.go b/pkg/schema/tracker_test.go index b243229211..8a240b0dd5 100644 --- a/pkg/schema/tracker_test.go +++ b/pkg/schema/tracker_test.go @@ -11,19 +11,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schema_test +package schema import ( "context" + "database/sql" "encoding/json" "sort" "testing" + "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" - "github.com/pingcap/dm/pkg/schema" "github.com/pingcap/log" "github.com/pingcap/parser/model" "go.uber.org/zap/zapcore" + + "github.com/pingcap/dm/pkg/conn" ) func Test(t *testing.T) { @@ -32,17 +35,78 @@ func Test(t *testing.T) { var _ = Suite(&trackerSuite{}) -type trackerSuite struct{} +var ( + defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"} +) + +type trackerSuite struct { + baseConn *conn.BaseConn + db *sql.DB + backupKeys []string +} + +func (s *trackerSuite) SetUpSuite(c *C) { + s.backupKeys = sessionVars + sessionVars = []string{"sql_mode"} + db, _, err := sqlmock.New() + s.db = db + c.Assert(err, IsNil) + con, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + s.baseConn = conn.NewBaseConn(con, nil) +} + +func (s *trackerSuite) TearDownSuite(c *C) { + s.db.Close() + sessionVars = s.backupKeys +} -func (s *trackerSuite) TestSessionCfg(c *C) { +func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { log.SetLevel(zapcore.ErrorLevel) + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + con, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + baseConn := conn.NewBaseConn(con, nil) + + // user give correct session config + _, err = NewTracker(defaultTestSessionCfg, baseConn) + c.Assert(err, IsNil) + + // user give wrong session session, will return error sessionCfg := map[string]string{"sql_mode": "HaHa"} - tracker, err := schema.NewTracker(sessionCfg) + _, err = NewTracker(sessionCfg, baseConn) c.Assert(err, NotNil) - tracker, err = schema.NewTracker(nil) + // discover session config failed, will return error + mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "HaHa")) + _, err = NewTracker(nil, baseConn) + c.Assert(err, NotNil) + + // empty or default config in downstream + mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "")) + tracker, err := NewTracker(nil, baseConn) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + err = tracker.Exec(context.Background(), "", "create database testdb;") c.Assert(err, IsNil) + + // found session config in downstream + mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE")) + tracker, err = NewTracker(nil, baseConn) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue) + c.Assert(tracker.se.GetSessionVars().SQLMode.HasStrictMode(), IsTrue) + ctx := context.Background() err = tracker.Exec(ctx, "", "create database testdb;") c.Assert(err, IsNil) @@ -51,11 +115,12 @@ func (s *trackerSuite) TestSessionCfg(c *C) { err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00')") c.Assert(err, NotNil) - // set session config + // user set session config, get tracker config from downstream // no `STRICT_TRANS_TABLES`, no error now sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"} - tracker, err = schema.NewTracker(sessionCfg) + tracker, err = NewTracker(sessionCfg, baseConn) c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) err = tracker.Exec(ctx, "", "create database testdb;") c.Assert(err, IsNil) @@ -75,22 +140,26 @@ func (s *trackerSuite) TestSessionCfg(c *C) { cts, err = tracker.GetCreateTable(context.Background(), "testdb", "foo") c.Assert(err, IsNil) c.Assert(cts, Equals, "CREATE TABLE \"foo\" ( \"a\" varchar(255) NOT NULL, PRIMARY KEY (\"a\")) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") + + // test alter primary key + err = tracker.Exec(ctx, "testdb", "alter table \"foo\" drop primary key") + c.Assert(err, IsNil) } func (s *trackerSuite) TestDDL(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := schema.NewTracker(nil) + tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn) c.Assert(err, IsNil) // Table shouldn't exist before initialization. _, err = tracker.GetTable("testdb", "foo") c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`) - c.Assert(schema.IsTableNotExists(err), IsTrue) + c.Assert(IsTableNotExists(err), IsTrue) _, err = tracker.GetCreateTable(context.Background(), "testdb", "foo") c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`) - c.Assert(schema.IsTableNotExists(err), IsTrue) + c.Assert(IsTableNotExists(err), IsTrue) ctx := context.Background() err = tracker.Exec(ctx, "", "create database testdb;") @@ -98,7 +167,7 @@ func (s *trackerSuite) TestDDL(c *C) { _, err = tracker.GetTable("testdb", "foo") c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`) - c.Assert(schema.IsTableNotExists(err), IsTrue) + c.Assert(IsTableNotExists(err), IsTrue) // Now create the table with 3 columns. err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b varchar(255) as (concat(a, a)), c int)") @@ -147,7 +216,7 @@ func (s *trackerSuite) TestDDL(c *C) { func (s *trackerSuite) TestGetSingleColumnIndices(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := schema.NewTracker(nil) + tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn) c.Assert(err, IsNil) ctx := context.Background() @@ -186,7 +255,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) { func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := schema.NewTracker(nil) + tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn) c.Assert(err, IsNil) // We cannot create a table without a database. @@ -214,7 +283,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) { func (s *trackerSuite) TestMultiDrop(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := schema.NewTracker(nil) + tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn) c.Assert(err, IsNil) ctx := context.Background() @@ -258,7 +327,7 @@ func (aj asJSON) String() string { func (s *trackerSuite) TestCreateTableIfNotExists(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := schema.NewTracker(nil) + tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn) c.Assert(err, IsNil) // Create some sort of complicated table. @@ -322,7 +391,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) { log.SetLevel(zapcore.ErrorLevel) ctx := context.Background() - tracker, err := schema.NewTracker(nil) + tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn) c.Assert(err, IsNil) // nothing should exist... diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 83e4938848..5284b0ba34 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -392,7 +392,7 @@ const ( codeSyncerFailpoint codeSyncerReplaceEvent codeSyncerOperatorNotExist - codeSyncerReplcaeEventNotExist + codeSyncerReplaceEventNotExist ) // DM-master error code @@ -560,6 +560,7 @@ const ( codeSchemaTrackerInvalidCreateTableStmt codeSchemaTrackerRestoreStmtFail codeSchemaTrackerCannotDropTable + codeSchemaTrackerInit ) // HA scheduler. @@ -957,7 +958,7 @@ var ( ErrSyncerFailpoint = New(codeSyncerFailpoint, ClassSyncUnit, ScopeInternal, LevelLow, "failpoint specified error", "") ErrSyncerReplaceEvent = New(codeSyncerReplaceEvent, ClassSyncUnit, ScopeInternal, LevelHigh, "", "") ErrSyncerOperatorNotExist = New(codeSyncerOperatorNotExist, ClassSyncUnit, ScopeInternal, LevelLow, "error operator not exist, position: %s", "") - ErrSyncerReplaceEventNotExist = New(codeSyncerReplcaeEventNotExist, ClassSyncUnit, ScopeInternal, LevelHigh, "replace event not exist, location: %s", "") + ErrSyncerReplaceEventNotExist = New(codeSyncerReplaceEventNotExist, ClassSyncUnit, ScopeInternal, LevelHigh, "replace event not exist, location: %s", "") // DM-master error ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid", "") @@ -1130,6 +1131,7 @@ var ( "fail to restore the statement", "") ErrSchemaTrackerCannotDropTable = New(codeSchemaTrackerCannotDropTable, ClassSchemaTracker, ScopeInternal, LevelHigh, "failed to drop table for `%s`.`%s` in schema tracker", "") + ErrSchemaTrackerInit = New(codeSchemaTrackerInit, ClassSchemaTracker, ScopeInternal, LevelHigh, "failed to create schema tracker", "") // HA scheduler ErrSchedulerNotStarted = New(codeSchedulerNotStarted, ClassScheduler, ScopeInternal, LevelHigh, "the scheduler has not started", "") diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index dab3026fb4..89ae39cf79 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -68,8 +68,12 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) { } log.SetLevel(zapcore.ErrorLevel) - var err error - s.tracker, err = schema.NewTracker(nil) + var ( + err error + defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"} + ) + + s.tracker, err = schema.NewTracker(defaultTestSessionCfg, nil) c.Assert(err, IsNil) } diff --git a/syncer/status.go b/syncer/status.go index 89f40fbc42..07e737cc6c 100644 --- a/syncer/status.go +++ b/syncer/status.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/dm/pkg/utils" ) -// Status implements SubTaskUnit.Status +// Status implements Unit.Status // it returns status, but does not calc status func (s *Syncer) Status() interface{} { var ( diff --git a/syncer/syncer.go b/syncer/syncer.go index b09c399cdf..7c107cf282 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -35,14 +35,13 @@ import ( "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go/sync2" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" - "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" @@ -228,12 +227,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.sgk = NewShardingGroupKeeper(syncer.tctx, cfg) } - var err error - syncer.schemaTracker, err = schema.NewTracker(cfg.To.Session) - if err != nil { - syncer.tctx.L().DPanic("cannot create schema tracker", zap.Error(err)) - } - return syncer } @@ -282,6 +275,11 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: s.closeDBs}) + s.schemaTracker, err = schema.NewTracker(s.cfg.To.Session, s.ddlDBConn.baseConn) + if err != nil { + return terror.ErrSchemaTrackerInit.Delegate(err) + } + s.streamerController = NewStreamerController(tctx, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) @@ -774,6 +772,10 @@ func (s *Syncer) addJob(job *job) error { s.c.reset() } + if s.execError.Get() != nil { + return nil + } + switch job.tp { case ddl: failpoint.Inject("ExitAfterDDLBeforeFlush", func() { @@ -930,6 +932,8 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, err = terror.WithScope(err, terror.ScopeDownstream) } } + // If downstream has error (which may cause by tracker is more compatible than downstream), we should stop handling + // this job, set `s.execError` to let caller of `addJob` discover error if err != nil { s.execError.Set(err) if !utils.IsContextCanceledError(err) { diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 467107b359..55c0d588eb 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/dm/pkg/log" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/retry" + "github.com/pingcap/dm/pkg/schema" streamer2 "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" @@ -57,6 +58,10 @@ import ( var _ = Suite(&testSyncerSuite{}) +var ( + defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"} +) + func TestSuite(t *testing.T) { TestingT(t) } @@ -967,6 +972,8 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { syncer.ddlDB = syncer.fromDB.BaseDB syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.schemaTracker, err = schema.NewTracker(defaultTestSessionCfg, syncer.ddlDBConn.baseConn) + c.Assert(err, IsNil) syncer.reset() syncer.streamerController = NewStreamerController(tcontext.Background(), syncer.syncCfg, true, syncer.fromDB, syncer.binlogType, syncer.cfg.RelayDir, syncer.timezone) @@ -1145,6 +1152,8 @@ func (s *testSyncerSuite) TestRun(c *C) { syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.schemaTracker, err = schema.NewTracker(defaultTestSessionCfg, syncer.ddlDBConn.baseConn) + c.Assert(err, IsNil) c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) syncer.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) @@ -1344,6 +1353,8 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.schemaTracker, err = schema.NewTracker(defaultTestSessionCfg, syncer.ddlDBConn.baseConn) + c.Assert(err, IsNil) c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) syncer.genRouter() diff --git a/tests/sharding/conf/dm-task.yaml b/tests/sharding/conf/dm-task.yaml index d9ac42e596..7e72a97cf3 100644 --- a/tests/sharding/conf/dm-task.yaml +++ b/tests/sharding/conf/dm-task.yaml @@ -11,8 +11,6 @@ target-database: port: 4000 user: "root" password: "" - session: - sql_mode: "NO_ZERO_DATE,NO_ZERO_IN_DATE" mysql-instances: - source-id: "mysql-replica-01" diff --git a/tests/sharding/run.sh b/tests/sharding/run.sh index ffae13306d..022728b5dd 100755 --- a/tests/sharding/run.sh +++ b/tests/sharding/run.sh @@ -18,6 +18,7 @@ EOF function run() { run_sql_both_source "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'" + run_sql_tidb "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' @@ -143,6 +144,7 @@ function run() { "task test has no source or not exist" 1 run_sql_both_source "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'" + run_sql_tidb "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'" } cleanup_data db_target