From 11fb5a8b63200efa2d6bbb2fa3bcb36d90e4f575 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 27 Jul 2021 17:28:16 +0800 Subject: [PATCH] fix safe mode (#1924) --- syncer/mode.go | 13 ++++++------- syncer/syncer.go | 30 ++++++++++++++++++------------ tests/safe_mode/conf/source1.yaml | 2 ++ tests/safe_mode/conf/source2.yaml | 2 ++ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/syncer/mode.go b/syncer/mode.go index a5fa192ff8..ba0f075ffe 100644 --- a/syncer/mode.go +++ b/syncer/mode.go @@ -21,27 +21,26 @@ import ( "github.com/pingcap/dm/dm/unit" tcontext "github.com/pingcap/dm/pkg/context" - sm "github.com/pingcap/dm/syncer/safe-mode" ) -func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeMode *sm.SafeMode) { - safeMode.Reset(tctx) // in initialization phase, reset first +func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context) { + s.safeMode.Reset(tctx) // in initialization phase, reset first if s.cfg.SafeMode { //nolint:errcheck - safeMode.Add(tctx, 1) // add 1 but has no corresponding -1, so keeps enabled + s.safeMode.Add(tctx, 1) // add 1 but has no corresponding -1, so keeps enabled s.tctx.L().Info("enable safe-mode by config") } if s.checkpoint.SafeModeExitPoint() != nil { //nolint:errcheck - safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc + s.safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc s.tctx.L().Info("enable safe-mode for safe mode exit point, will exit at", zap.Stringer("location", *s.checkpoint.SafeModeExitPoint())) } else { //nolint:errcheck - safeMode.Add(tctx, 1) // enable and will revert after 2 * CheckpointFlushInterval + s.safeMode.Add(tctx, 1) // enable and will revert after 2 * CheckpointFlushInterval go func() { defer func() { - err := safeMode.Add(tctx, -1) + err := s.safeMode.Add(tctx, -1) if err != nil { // send error to the fatal chan to interrupt the process s.runFatalChan <- unit.NewProcessError(err) diff --git a/syncer/syncer.go b/syncer/syncer.go index cee5baed7b..4704955966 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -154,6 +154,10 @@ type Syncer struct { sync.RWMutex t time.Time } + // safeMode is used to track if we need to generate dml with safe-mode + // For each binlog event, we will set the current value into eventContext because + // the status of this track may change over time. + safeMode *sm.SafeMode timezone *time.Location @@ -1578,8 +1582,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // but there are no ways to make `update` idempotent, // if we start syncer at an early position, database must bear a period of inconsistent state, // it's eventual consistency. - safeMode := sm.NewSafeMode() - s.enableSafeModeInitializationPhase(tctx, safeMode) + s.safeMode = sm.NewSafeMode() + s.enableSafeModeInitializationPhase(tctx) closeShardingResync := func() error { if shardingReSync == nil { @@ -1787,7 +1791,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err != nil { return err } - if err = safeMode.Add(tctx, -1); err != nil { + if err = s.safeMode.Add(tctx, -1); err != nil { return err } } @@ -1802,7 +1806,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { shardingReSync: shardingReSync, closeShardingResync: closeShardingResync, traceSource: traceSource, - safeMode: safeMode, + safeMode: s.safeMode.Enable(), tryReSync: tryReSync, startTime: startTime, shardingReSyncCh: &shardingReSyncCh, @@ -1880,10 +1884,12 @@ type eventContext struct { shardingReSync *ShardingReSync closeShardingResync func() error traceSource string - safeMode *sm.SafeMode - tryReSync bool - startTime time.Time - shardingReSyncCh *chan *ShardingReSync + // safeMode is the value of syncer.safeMode when process this event + // syncer.safeMode's value may change on the fly, e.g. after event by pass the safeModeExitPoint + safeMode bool + tryReSync bool + startTime time.Time + shardingReSyncCh *chan *ShardingReSync } // TODO: Further split into smaller functions and group common arguments into a context struct. @@ -2049,7 +2055,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err2 } - param.safeMode = ec.safeMode.Enable() + param.safeMode = ec.safeMode sqls, keys, args, err = s.genInsertSQLs(param, exprFilter) if err != nil { return terror.Annotatef(err, "gen insert sqls failed, schema: %s, table: %s", schemaName, tableName) @@ -2063,7 +2069,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err2 } - param.safeMode = ec.safeMode.Enable() + param.safeMode = ec.safeMode sqls, keys, args, err = s.genUpdateSQLs(param, oldExprFilter, newExprFilter) if err != nil { return terror.Annotatef(err, "gen update sqls failed, schema: %s, table: %s", schemaName, tableName) @@ -2425,7 +2431,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o if needShardingHandle { target, _ := utils.GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) metrics.UnsyncedTableGauge.WithLabelValues(s.cfg.Name, target, s.cfg.SourceID).Set(float64(remain)) - err = ec.safeMode.IncrForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group + err = s.safeMode.IncrForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group if err != nil { return err } @@ -2441,7 +2447,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o } ec.tctx.L().Info("source shard group is synced", zap.String("event", "query"), zap.String("source", source), zap.Stringer("start location", startLocation), log.WrapStringerField("end location", ec.currentLocation)) - err = ec.safeMode.DescForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try disable safe-mode after sharding group synced + err = s.safeMode.DescForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try disable safe-mode after sharding group synced if err != nil { return err } diff --git a/tests/safe_mode/conf/source1.yaml b/tests/safe_mode/conf/source1.yaml index fae9f5bd7e..78e96409ee 100644 --- a/tests/safe_mode/conf/source1.yaml +++ b/tests/safe_mode/conf/source1.yaml @@ -9,3 +9,5 @@ from: user: root password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3306 +checker: + check-enable: false diff --git a/tests/safe_mode/conf/source2.yaml b/tests/safe_mode/conf/source2.yaml index cc24bfedbc..dd27fe6335 100644 --- a/tests/safe_mode/conf/source2.yaml +++ b/tests/safe_mode/conf/source2.yaml @@ -9,3 +9,5 @@ from: user: root password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3307 +checker: + check-enable: false