diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 0514494f16..c1ec325f24 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -145,8 +145,8 @@ type CheckPoint interface { // DeleteTablePoint deletes checkpoint for specified table in memory and storage DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error - // IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint - IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool + // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint + IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save @@ -327,20 +327,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchem return nil } -// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint -func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool { +// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. +// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. +// if useLE is true, we use less than or equal. +func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool { cp.RLock() defer cp.RUnlock() mSchema, ok := cp.points[sourceSchema] if !ok { - return true + return false } point, ok := mSchema[sourceTable] if !ok { - return true + return false } oldPos := point.MySQLPos() - return pos.Compare(oldPos) > 0 + cp.logCtx.L().Debug("compare table position whether is newer", zap.Stringer("position", pos), zap.Stringer("old position", oldPos)) + if useLE { + return pos.Compare(oldPos) <= 0 + } + return pos.Compare(oldPos) < 0 } // SaveGlobalPoint implements CheckPoint.SaveGlobalPoint diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 00a318309f..c6df4f1f4c 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -269,23 +269,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { ) // not exist - newer := cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsTrue) + older := cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsFalse) // save cp.SaveTablePoint(schema, table, pos2) - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsTrue) // rollback, to min cp.Rollback() - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsFalse) // save again cp.SaveTablePoint(schema, table, pos2) - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsTrue) // flush + rollback s.mock.ExpectBegin() @@ -294,8 +294,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) cp.Rollback() - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsTrue) // clear, to min s.mock.ExpectBegin() @@ -303,13 +303,13 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.Clear(tctx) c.Assert(err, IsNil) - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsFalse) // save cp.SaveTablePoint(schema, table, pos2) - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsFalse) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsTrue) // test save table point less than global point func() { @@ -330,6 +330,6 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) cp.Rollback() - newer = cp.IsNewerTablePoint(schema, table, pos1) - c.Assert(newer, IsTrue) + older = cp.IsOlderThanTablePoint(schema, table, pos1, false) + c.Assert(older, IsFalse) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 34de9b2c75..124f8ad44e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -15,6 +15,7 @@ package syncer import ( "context" + "errors" "fmt" "math" "os" @@ -751,7 +752,34 @@ func (s *Syncer) checkWait(job *job) bool { return false } +// only used in tests. +var ( + lastPos mysql.Position + lastPosNum int + waitJobsDone bool + failExecuteSQL bool + failOnce sync2.AtomicInt64 +) + func (s *Syncer) addJob(job *job) error { + failpoint.Inject("countJobFromOneEvent", func() { + if job.currentPos.Compare(lastPos) == 0 { + lastPosNum++ + } else { + lastPos = job.currentPos + lastPosNum = 1 + } + // trigger a flush after see one job + if lastPosNum == 1 { + waitJobsDone = true + s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos)) + } + // mock a execution error after see two jobs. + if lastPosNum == 2 { + failExecuteSQL = true + s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos)) + } + }) var ( queueBucket int execDDLReq *pb.ExecDDLRequest @@ -802,6 +830,13 @@ func (s *Syncer) addJob(job *job) error { } wait := s.checkWait(job) + failpoint.Inject("flushFirstJobOfEvent", func() { + if waitJobsDone { + s.tctx.L().Info("trigger flushFirstJobOfEvent") + waitJobsDone = false + wait = true + } + }) if wait { s.jobWg.Wait() s.c.reset() @@ -1001,6 +1036,13 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo if len(jobs) == 0 { return nil } + + failpoint.Inject("failSecondJobOfEvent", func() { + if failExecuteSQL && failOnce.CompareAndSwap(0, 1) { + s.tctx.L().Info("trigger failSecondJobOfEvent") + failpoint.Return(errors.New("failSecondJobOfEvent")) + } + }) queries := make([]string, 0, len(jobs)) args := make([][]interface{}, 0, len(jobs)) for _, j := range jobs { @@ -1470,7 +1512,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } // DML position before table checkpoint, ignore it - if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentPos) { + if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentPos, false) { s.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("position", ec.currentPos), zap.String("origin schema", originSchema), zap.String("origin table", originTable)) return nil } @@ -1700,7 +1742,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine // ignore obsolete DDL here can avoid to try-sync again for already synced DDLs - if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos) { + if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos, true) { s.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("position", ec.currentPos)) continue } diff --git a/tests/all_mode/data/db1.increment3.sql b/tests/all_mode/data/db1.increment3.sql new file mode 100644 index 0000000000..87fc44acad --- /dev/null +++ b/tests/all_mode/data/db1.increment3.sql @@ -0,0 +1,3 @@ +use all_mode; +insert into t1 (id, name) values (10, '10'), (20, '20'); +insert into t1 (id, name) values (30, '30'); diff --git a/tests/all_mode/data/db2.increment3.sql b/tests/all_mode/data/db2.increment3.sql new file mode 100644 index 0000000000..1d579a06d8 --- /dev/null +++ b/tests/all_mode/data/db2.increment3.sql @@ -0,0 +1,3 @@ +use all_mode; +insert into t2 (id, name) values (10, '10'), (20, '20'); +insert into t2 (id, name) values (30, '30'); diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 8827314873..82d4133334 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -52,11 +52,54 @@ function test_session_config(){ echo "[$(date)] <<<<<< finish test_session_config >>>>>>" } -function run() { +function test_fail_job_between_event() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + inject_points=( + "github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + "github.com/pingcap/dm/syncer/countJobFromOneEvent=return()" + "github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()" + "github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i 's/sql_mode: ".*"/sql_mode: "NO_AUTO_VALUE_ON_ZERO"/g' $WORK_DIR/dm-task.yaml + dmctl_start_task "$WORK_DIR/dm-task.yaml" + + run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + sleep 2 + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "failSecondJobOfEvent" + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "failSecondJobOfEvent" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + cleanup_data all_mode + cleanup_process $* + + export GO_FAILPOINTS='' +} +function run() { run_sql "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" $MYSQL_PORT2 $MYSQL_PASSWORD2 + test_fail_job_between_event + test_session_config export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"