Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: don't skip jobs from same event when comparing table checkpoi…
Browse files Browse the repository at this point in the history
…nt (#1752)
  • Loading branch information
lance6716 authored Jun 17, 2021
1 parent 6df7f62 commit 944c5aa
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 40 deletions.
32 changes: 15 additions & 17 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ type CheckPoint interface {
// DeleteSchemaPoint deletes checkpoint for specified schema
DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
IsNewerTablePoint(sourceSchema, sourceTable string, point binlog.Location, gte bool) bool
// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
IsOlderThanTablePoint(sourceSchema, sourceTable string, point binlog.Location, useLE bool) bool

// SaveGlobalPoint saves the global binlog stream's checkpoint
// corresponding to Meta.Save
Expand Down Expand Up @@ -443,34 +443,32 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche
return nil
}

// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint.
// gte means greater than or equal, gte should judge by EnableGTID and the event type
// - when enable GTID and binlog is DML, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are:
// - Query event, location is gset1
// - Rows event, location is gset1
// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
// For GTID replication, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are:
// - Query event e1, location is gset1
// - Rows event e2, location is gset1
// - XID event, location is gset2
// after syncer handle query event, will save table point with gset1, and when handle rows event, will compare the rows's location with table checkpoint's location in `IsNewerTablePoint`, and these two location have same gset, so we should use `>=` to compare location in this case.
// - when enable GTID and binlog is DDL, different DDL have different GTID set, so if GTID set is euqal, it is a old table point, should use `>` to compare location in this case.
// - when not enable GTID, just compare the position, and only when grater than the old point is newer table point, should use `>` to compare location is this case.
func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, location binlog.Location, gte bool) bool {
// We should note that e1 is not older than e2
// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position.
// if useLE is true, we use less than or equal.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, location binlog.Location, useLE bool) bool {
cp.RLock()
defer cp.RUnlock()
mSchema, ok := cp.points[sourceSchema]
if !ok {
return true
return false
}
point, ok := mSchema[sourceTable]
if !ok {
return true
return false
}
oldLocation := point.MySQLLocation()
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation))

if gte {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) >= 0
if useLE {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) <= 0
}

return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) > 0
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0
}

// SaveGlobalPoint implements CheckPoint.SaveGlobalPoint.
Expand Down
32 changes: 16 additions & 16 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
)

// not exist
newer := cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older := cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

// save
cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// rollback, to min
cp.Rollback(s.tracker)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

// save again
cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// flush + rollback
s.mock.ExpectBegin()
Expand All @@ -365,13 +365,13 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
cp.Rollback(s.tracker)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// save
cp.SaveTablePoint(schema, table, binlog.Location{Position: pos2}, nil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsTrue)

// delete
s.mock.ExpectBegin()
Expand Down Expand Up @@ -431,8 +431,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectCommit()
err = cp.Clear(tctx)
c.Assert(err, IsNil)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

// test save table point less than global point
func() {
Expand All @@ -452,8 +452,8 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil)
c.Assert(err, IsNil)
cp.Rollback(s.tracker)
newer = cp.IsNewerTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, binlog.Location{Position: pos1}, false)
c.Assert(older, IsFalse)

s.mock.ExpectBegin()
s.mock.ExpectExec(clearCheckPointSQL).WithArgs(cpid).WillReturnResult(sqlmock.NewResult(0, 1))
Expand Down
53 changes: 47 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,34 @@ func (s *Syncer) saveTablePoint(db, table string, location binlog.Location) {
s.checkpoint.SaveTablePoint(db, table, location, ti)
}

// only used in tests.
var (
lastPos mysql.Position
lastPosNum int
waitJobsDone bool
failExecuteSQL bool
failOnce atomic.Bool
)

func (s *Syncer) addJob(job *job) error {
failpoint.Inject("countJobFromOneEvent", func() {
if job.currentLocation.Position.Compare(lastPos) == 0 {
lastPosNum++
} else {
lastPos = job.currentLocation.Position
lastPosNum = 1
}
// trigger a flush after see one job
if lastPosNum == 1 {
waitJobsDone = true
s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos))
}
// mock a execution error after see two jobs.
if lastPosNum == 2 {
failExecuteSQL = true
s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos))
}
})
var queueBucket int
switch job.tp {
case xid:
Expand Down Expand Up @@ -838,6 +865,13 @@ func (s *Syncer) addJob(job *job) error {

// nolint:ifshort
wait := s.checkWait(job)
failpoint.Inject("flushFirstJobOfEvent", func() {
if waitJobsDone {
s.tctx.L().Info("trigger flushFirstJobOfEvent")
waitJobsDone = false
wait = true
}
})
if wait {
s.jobWg.Wait()
s.c.reset()
Expand Down Expand Up @@ -1122,6 +1156,13 @@ func (s *Syncer) syncDML(tctx *tcontext.Context, queueBucket string, db *DBConn,
return 0, nil
}

failpoint.Inject("failSecondJobOfEvent", func() {
if failExecuteSQL && failOnce.CAS(false, true) {
s.tctx.L().Info("trigger failSecondJobOfEvent")
failpoint.Return(0, errors.New("failSecondJobOfEvent"))
}
})

select {
case <-tctx.Ctx.Done():
// do not execute queries anymore, because they should be failed with a done context.
Expand Down Expand Up @@ -1210,13 +1251,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

var (
flushCheckpoint bool
cleanDumpFile = s.cfg.CleanDumpFile
cleanDumpFile = s.cfg.CleanDumpFile && fresh
)
flushCheckpoint, err = s.adjustGlobalPointGTID(tctx)
if err != nil {
return err
}
if s.cfg.Mode == config.ModeAll {
if s.cfg.Mode == config.ModeAll && fresh {
flushCheckpoint = true
err = s.loadTableStructureFromDump(ctx)
if err != nil {
Expand Down Expand Up @@ -1742,7 +1783,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}

// DML position before table checkpoint, ignore it
if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentLocation, s.cfg.EnableGTID) {
if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentLocation, false) {
ec.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("location", ec.currentLocation), zap.String("origin schema", originSchema), zap.String("origin table", originTable))
return nil
}
Expand Down Expand Up @@ -1983,9 +2024,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
continue
}

// for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine
// ignore obsolete DDL here can avoid to try-sync again for already synced DDLs
if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, false) {
// DDL is sequentially synchronized in this syncer's main process goroutine
// ignore DDL that is older or same as table checkpoint, to try-sync again for already synced DDLs
if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentLocation, true) {
ec.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation))
continue
}
Expand Down
3 changes: 3 additions & 0 deletions tests/all_mode/data/db1.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t1 (id, name) values (10, '10'), (20, '20');
insert into t1 (id, name) values (30, '30');
3 changes: 3 additions & 0 deletions tests/all_mode/data/db2.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t2 (id, name) values (10, '10'), (20, '20');
insert into t2 (id, name) values (30, '30');
51 changes: 50 additions & 1 deletion tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,60 @@ function test_stop_task_before_checkpoint() {
export GO_FAILPOINTS=''
}

function test_fail_job_between_event() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
check_metric $MASTER_PORT 'start_leader_counter' 3 0 2

inject_points=(
"github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/dm/syncer/countJobFromOneEvent=return()"
"github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()"
"github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "s/enable-gtid: true/enable-gtid: false/g" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"

run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry "failSecondJobOfEvent" $WORK_DIR/worker2/log/dm-worker.log
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 3
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

cleanup_data all_mode
cleanup_process $*

export GO_FAILPOINTS=''
}

function run() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'"
run_sql_source1 "SET @@global.time_zone = '+01:00';"
run_sql_source2 "SET @@global.time_zone = '+02:00';"

test_fail_job_between_event
test_syncer_metrics
test_session_config
test_query_timeout
Expand Down

0 comments on commit 944c5aa

Please sign in to comment.