Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: don't skip jobs from same event when comparing table checkpoint (#1752) #1783

Merged
merged 1 commit into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ type CheckPoint interface {
// DeleteTablePoint deletes checkpoint for specified table in memory and storage
DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error

// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool
// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool

// SaveGlobalPoint saves the global binlog stream's checkpoint
// corresponding to Meta.Save
Expand Down Expand Up @@ -325,20 +325,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchem
return nil
}

// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint
func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool {
// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position.
// if useLE is true, we use less than or equal.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool {
cp.RLock()
defer cp.RUnlock()
mSchema, ok := cp.points[sourceSchema]
if !ok {
return true
return false
}
point, ok := mSchema[sourceTable]
if !ok {
return true
return false
}
oldPos := point.MySQLPos()
return pos.Compare(oldPos) > 0
cp.logCtx.L().Debug("compare table position whether is newer", zap.Stringer("position", pos), zap.Stringer("old position", oldPos))
if useLE {
return pos.Compare(oldPos) <= 0
}
return pos.Compare(oldPos) < 0
}

// SaveGlobalPoint implements CheckPoint.SaveGlobalPoint
Expand Down
32 changes: 16 additions & 16 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
)

// not exist
newer := cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older := cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)

// save
cp.SaveTablePoint(schema, table, pos2)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// rollback, to min
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)

// save again
cp.SaveTablePoint(schema, table, pos2)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// flush + rollback
s.mock.ExpectBegin()
Expand All @@ -280,22 +280,22 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// clear, to min
s.mock.ExpectBegin()
s.mock.ExpectExec(clearCheckPointSQL).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.Clear(tctx)
c.Assert(err, IsNil)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)

// save
cp.SaveTablePoint(schema, table, pos2)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// test save table point less than global point
func() {
Expand All @@ -316,6 +316,6 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)
}
46 changes: 44 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package syncer

import (
"context"
"errors"
"fmt"
"math"
"reflect"
Expand Down Expand Up @@ -737,7 +738,34 @@ func (s *Syncer) checkWait(job *job) bool {
return false
}

// only used in tests.
var (
lastPos mysql.Position
lastPosNum int
waitJobsDone bool
failExecuteSQL bool
failOnce sync2.AtomicInt64
)

func (s *Syncer) addJob(job *job) error {
failpoint.Inject("countJobFromOneEvent", func() {
if job.currentPos.Compare(lastPos) == 0 {
lastPosNum++
} else {
lastPos = job.currentPos
lastPosNum = 1
}
// trigger a flush after see one job
if lastPosNum == 1 {
waitJobsDone = true
s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos))
}
// mock a execution error after see two jobs.
if lastPosNum == 2 {
failExecuteSQL = true
s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos))
}
})
var (
queueBucket int
execDDLReq *pb.ExecDDLRequest
Expand Down Expand Up @@ -788,6 +816,13 @@ func (s *Syncer) addJob(job *job) error {
}

wait := s.checkWait(job)
failpoint.Inject("flushFirstJobOfEvent", func() {
if waitJobsDone {
s.tctx.L().Info("trigger flushFirstJobOfEvent")
waitJobsDone = false
wait = true
}
})
if wait {
s.jobWg.Wait()
s.c.reset()
Expand Down Expand Up @@ -987,6 +1022,13 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
if len(jobs) == 0 {
return nil
}

failpoint.Inject("failSecondJobOfEvent", func() {
if failExecuteSQL && failOnce.CompareAndSwap(0, 1) {
s.tctx.L().Info("trigger failSecondJobOfEvent")
failpoint.Return(errors.New("failSecondJobOfEvent"))
}
})
queries := make([]string, 0, len(jobs))
args := make([][]interface{}, 0, len(jobs))
for _, j := range jobs {
Expand Down Expand Up @@ -1440,7 +1482,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}

// DML position before table checkpoint, ignore it
if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentPos) {
if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentPos, false) {
s.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("position", ec.currentPos), zap.String("origin schema", originSchema), zap.String("origin table", originTable))
return nil
}
Expand Down Expand Up @@ -1670,7 +1712,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

// for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine
// ignore obsolete DDL here can avoid to try-sync again for already synced DDLs
if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos) {
if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos, true) {
s.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("position", ec.currentPos))
continue
}
Expand Down
3 changes: 3 additions & 0 deletions tests/all_mode/data/db1.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t1 (id, name) values (10, '10'), (20, '20');
insert into t1 (id, name) values (30, '30');
3 changes: 3 additions & 0 deletions tests/all_mode/data/db2.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t2 (id, name) values (10, '10'), (20, '20');
insert into t2 (id, name) values (30, '30');
44 changes: 44 additions & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,51 @@ function test_session_config(){
echo "[$(date)] <<<<<< finish test_session_config >>>>>>"
}

function test_fail_job_between_event() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

inject_points=(
"github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/dm/syncer/countJobFromOneEvent=return()"
"github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()"
"github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
sed -i 's/sql_mode: ".*"/sql_mode: "NO_AUTO_VALUE_ON_ZERO"/g' $WORK_DIR/dm-task.yaml
dmctl_start_task "$WORK_DIR/dm-task.yaml"

run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
sleep 2
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "failSecondJobOfEvent"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "failSecondJobOfEvent"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 3
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

cleanup_data all_mode
cleanup_process $*

export GO_FAILPOINTS=''
}

function run() {
test_fail_job_between_event

test_session_config

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
Expand Down