diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index e2ff9ec75d..33529cc22a 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -198,8 +198,8 @@ type CheckPoint interface { // DeleteSchemaPoint deletes checkpoint for specified schema DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error - // IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint - IsNewerTablePoint(sourceSchema, sourceTable string, point binlog.Location, gte bool) bool + // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint + IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location, useLE bool) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save @@ -443,34 +443,32 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche return nil } -// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint. -// gte means greater than or equal, gte should judge by EnableGTID and the event type -// - when enable GTID and binlog is DML, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are: -// - Query event, location is gset1 -// - Rows event, location is gset1 +// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. +// For GTID replication, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are: +// - Query event e1, location is gset1 +// - Rows event e2, location is gset1 // - XID event, location is gset2 -// after syncer handle query event, will save table point with gset1, and when handle rows event, will compare the rows's location with table checkpoint's location in `IsNewerTablePoint`, and these two location have same gset, so we should use `>=` to compare location in this case. -// - when enable GTID and binlog is DDL, different DDL have different GTID set, so if GTID set is euqal, it is a old table point, should use `>` to compare location in this case. -// - when not enable GTID, just compare the position, and only when grater than the old point is newer table point, should use `>` to compare location is this case. -func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, location binlog.Location, gte bool) bool { +// We should note that e1 is not older than e2 +// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. +// if useLE is true, we use less than or equal. +func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, location binlog.Location, useLE bool) bool { cp.RLock() defer cp.RUnlock() mSchema, ok := cp.points[sourceSchema] if !ok { - return true + return false } point, ok := mSchema[sourceTable] if !ok { - return true + return false } oldLocation := point.MySQLLocation() cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation)) - if gte { - return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0 + if useLE { + return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) <= 0 } - - return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) > 0 + return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0 } // SaveGlobalPoint implements CheckPoint.SaveGlobalPoint. diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 804f60cf9c..bf423371f1 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -340,23 +340,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { ) // not exist - newer := cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older := cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsFalse) // save cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsTrue) // rollback, to min cp.Rollback(s.tracker) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsFalse) // save again cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsTrue) // flush + rollback s.mock.ExpectBegin() @@ -365,13 +365,13 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) cp.Rollback(s.tracker) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsTrue) // save cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsTrue) // delete s.mock.ExpectBegin() @@ -431,8 +431,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.Clear(tctx) c.Assert(err, IsNil) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsFalse) // test save table point less than global point func() { @@ -452,8 +452,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) cp.Rollback(s.tracker) - newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false) + c.Assert(older, IsFalse) s.mock.ExpectBegin() s.mock.ExpectExec(clearCheckPointSQL).WithArgs(cpid).WillReturnResult(sqlmock.NewResult(0, 1)) diff --git a/syncer/syncer.go b/syncer/syncer.go index f6dba1ac52..085f4dfd93 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -797,7 +797,34 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) { s.checkpoint.SaveTablePoint(db, table, location, ti) } +// only used in tests. +var ( + lastPos mysql.Position + lastPosNum int + waitJobsDone bool + failExecuteSQL bool + failOnce atomic.Bool +) + func (s *Syncer) addJob(job *job) error { + failpoint.Inject("countJobFromOneEvent", func() { + if job.currentLocation.Position.Compare(lastPos) == 0 { + lastPosNum++ + } else { + lastPos = job.currentLocation.Position + lastPosNum = 1 + } + // trigger a flush after see one job + if lastPosNum == 1 { + waitJobsDone = true + s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos)) + } + // mock a execution error after see two jobs. + if lastPosNum == 2 { + failExecuteSQL = true + s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos)) + } + }) var queueBucket int switch job.tp { case xid: @@ -838,6 +865,13 @@ func (s *Syncer) addJob(job *job) error { // nolint:ifshort wait := s.checkWait(job) + failpoint.Inject("flushFirstJobOfEvent", func() { + if waitJobsDone { + s.tctx.L().Info("trigger flushFirstJobOfEvent") + waitJobsDone = false + wait = true + } + }) if wait { s.jobWg.Wait() s.c.reset() @@ -1122,6 +1156,13 @@ func (s *Syncer) syncDML(tctx *tcontext.Context, queueBucket string, db *DBConn, return 0, nil } + failpoint.Inject("failSecondJobOfEvent", func() { + if failExecuteSQL && failOnce.CAS(false, true) { + s.tctx.L().Info("trigger failSecondJobOfEvent") + failpoint.Return(0, errors.New("failSecondJobOfEvent")) + } + }) + select { case <-tctx.Ctx.Done(): // do not execute queries anymore, because they should be failed with a done context. @@ -1210,13 +1251,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var ( flushCheckpoint bool - cleanDumpFile = s.cfg.CleanDumpFile + cleanDumpFile = s.cfg.CleanDumpFile && fresh ) flushCheckpoint, err = s.adjustGlobalPointGTID(tctx) if err != nil { return err } - if s.cfg.Mode == config.ModeAll { + if s.cfg.Mode == config.ModeAll && fresh { flushCheckpoint = true err = s.loadTableStructureFromDump(ctx) if err != nil { @@ -1742,7 +1783,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } // DML position before table checkpoint, ignore it - if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentLocation, s.cfg.EnableGTID) { + if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentLocation, false) { ec.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("location", ec.currentLocation), zap.String("origin schema", originSchema), zap.String("origin table", originTable)) return nil } @@ -1983,9 +2024,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o continue } - // for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine - // ignore obsolete DDL here can avoid to try-sync again for already synced DDLs - if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, false) { + // DDL is sequentially synchronized in this syncer's main process goroutine + // ignore DDL that is older or same as table checkpoint, to try-sync again for already synced DDLs + if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, true) { ec.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation)) continue } diff --git a/tests/all_mode/data/db1.increment3.sql b/tests/all_mode/data/db1.increment3.sql new file mode 100644 index 0000000000..87fc44acad --- /dev/null +++ b/tests/all_mode/data/db1.increment3.sql @@ -0,0 +1,3 @@ +use all_mode; +insert into t1 (id, name) values (10, '10'), (20, '20'); +insert into t1 (id, name) values (30, '30'); diff --git a/tests/all_mode/data/db2.increment3.sql b/tests/all_mode/data/db2.increment3.sql new file mode 100644 index 0000000000..1d579a06d8 --- /dev/null +++ b/tests/all_mode/data/db2.increment3.sql @@ -0,0 +1,3 @@ +use all_mode; +insert into t2 (id, name) values (10, '10'), (20, '20'); +insert into t2 (id, name) values (30, '30'); diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 003bb3fe47..f8a3dfe080 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -263,11 +263,60 @@ function test_stop_task_before_checkpoint() { export GO_FAILPOINTS='' } +function test_fail_job_between_event() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 + + inject_points=( + "github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + "github.com/pingcap/dm/syncer/countJobFromOneEvent=return()" + "github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()" + "github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "s/enable-gtid: true/enable-gtid: false/g" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + + run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + cleanup_data all_mode + cleanup_process $* + + export GO_FAILPOINTS='' +} + function run() { run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" run_sql_source1 "SET @@global.time_zone = '+01:00';" run_sql_source2 "SET @@global.time_zone = '+02:00';" - + test_fail_job_between_event test_syncer_metrics test_session_config test_query_timeout