Skip to content

Commit

Permalink
syncer(dm): use an early location to reset binlog and open safemode (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Dec 22, 2021
1 parent 8b8b0fa commit 167963a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 49 deletions.
21 changes: 19 additions & 2 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return nil
}

inFinerRetry := false
// in release branch, we only use eventIndex to test a bug
eventIndex := 0
for {
if s.execError.Load() != nil {
return nil
Expand Down Expand Up @@ -1591,6 +1594,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
err = errors.New("connect: connection refused")
}
})
failpoint.Inject("GetEventErrorInTxn", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == eventIndex {
err = errors.New("failpoint triggered")
s.tctx.L().Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("cur_pos", currentLocation), zap.Any("las_pos", lastLocation),
zap.Any("pos", e.Header.LogPos), log.ShortError(err))
}
})
switch {
case err == context.Canceled:
tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation))
Expand All @@ -1617,11 +1628,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

if s.streamerController.CanRetry(err) {
// lastLocation is the last finished GTID
err = s.streamerController.ResetReplicationSyncer(tctx, lastLocation)
err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint())
if err != nil {
return err
}
log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint()))
inFinerRetry = true
continue
}

Expand Down Expand Up @@ -1760,7 +1772,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
shardingReSync: shardingReSync,
closeShardingResync: closeShardingResync,
traceSource: traceSource,
safeMode: s.safeMode.Enable(),
safeMode: s.safeMode.Enable() || inFinerRetry,
tryReSync: tryReSync,
startTime: startTime,
shardingReSyncCh: &shardingReSyncCh,
Expand All @@ -1773,12 +1785,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.RotateEvent:
err2 = s.handleRotateEvent(ev, ec)
case *replication.RowsEvent:
eventIndex++
metrics.BinlogEventRowHistogram.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID).Observe(float64(len(ev.Rows)))
err2 = s.handleRowsEvent(ev, ec)
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(ev.Query))
err2 = s.handleQueryEvent(ev, ec, originSQL)
case *replication.XIDEvent:
eventIndex = 0
if inFinerRetry {
inFinerRetry = false
}
if shardingReSync != nil {
shardingReSync.currLocation.Position.Pos = e.Header.LogPos
shardingReSync.currLocation.Suffix = currentLocation.Suffix
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/_utils/check_sync_diff
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cd $workdir
i=0
while [ $i -lt $check_time ]; do
rm -rf $OUTPUT_DIR
$binary --config=$conf >>$LOG 2>&1
$binary --config=$conf >$LOG 2>&1
ret=$?
if [ "$ret" == 0 ]; then
echo "check diff successfully"
Expand Down
46 changes: 0 additions & 46 deletions dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,53 +34,7 @@ function run() {
run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

check_log_contain_with_retry "reset replication binlog puller" $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry "discard event already consumed" $WORK_DIR/worker1/log/dm-worker.log
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# 2. test relay log retry relay with GTID

# with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID
# here we fail at the third write rows event, sync should retry and auto recover without any duplicate event
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=15*return(3);github.com/pingcap/tiflow/dm/relay/retry/RelayAllowRetry=return"

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 2 rows affected'

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task-relay.yaml --remove-meta"
check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"test_relay\",worker=\"worker2\"}" 10 1 3

check_sync_diff $WORK_DIR $cur/conf/diff_relay_config.toml

run_sql_source2 "flush logs;"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID2 worker2" \
"\"result\": true" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID2" \
"\"relayCatchUpMaster\": true" 1

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

check_log_contain_with_retry "retrying to read binlog" $WORK_DIR/worker2/log/dm-worker.log
check_log_contain_with_retry "discard duplicate event" $WORK_DIR/worker2/log/dm-worker.log

check_sync_diff $WORK_DIR $cur/conf/diff_relay_config.toml

# check relay log binlog file size is the same as master size
run_sql_source2 "show master status;"
binlog_file=$(grep "File" $TEST_DIR/sql_res.$TEST_NAME.txt | awk -F: '{print $2}' | xargs)
binlog_pos=$(grep "Position" $TEST_DIR/sql_res.$TEST_NAME.txt | awk -F: '{print $2}' | xargs)

server_uuid=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index)
relay_log_size=$(ls -al $WORK_DIR/worker2/relay-dir/$server_uuid/$binlog_file | awk '{print $5}')
[ "$binlog_pos" -eq "$relay_log_size" ]
}

# also cleanup dm processes in case of last run failed
Expand Down
1 change: 1 addition & 0 deletions dm/tests/others_integration_2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ case_sensitive
sql_mode
http_proxies
openapi
duplicate_event
tracker_ignored_ddl

0 comments on commit 167963a

Please sign in to comment.