Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix safe mode (#1924)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Jul 27, 2021
1 parent cafaf9e commit 11fb5a8
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
13 changes: 6 additions & 7 deletions syncer/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,26 @@ import (

"github.com/pingcap/dm/dm/unit"
tcontext "github.com/pingcap/dm/pkg/context"
sm "github.com/pingcap/dm/syncer/safe-mode"
)

func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeMode *sm.SafeMode) {
safeMode.Reset(tctx) // in initialization phase, reset first
func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context) {
s.safeMode.Reset(tctx) // in initialization phase, reset first

if s.cfg.SafeMode {
//nolint:errcheck
safeMode.Add(tctx, 1) // add 1 but has no corresponding -1, so keeps enabled
s.safeMode.Add(tctx, 1) // add 1 but has no corresponding -1, so keeps enabled
s.tctx.L().Info("enable safe-mode by config")
}
if s.checkpoint.SafeModeExitPoint() != nil {
//nolint:errcheck
safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc
s.safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc
s.tctx.L().Info("enable safe-mode for safe mode exit point, will exit at", zap.Stringer("location", *s.checkpoint.SafeModeExitPoint()))
} else {
//nolint:errcheck
safeMode.Add(tctx, 1) // enable and will revert after 2 * CheckpointFlushInterval
s.safeMode.Add(tctx, 1) // enable and will revert after 2 * CheckpointFlushInterval
go func() {
defer func() {
err := safeMode.Add(tctx, -1)
err := s.safeMode.Add(tctx, -1)
if err != nil {
// send error to the fatal chan to interrupt the process
s.runFatalChan <- unit.NewProcessError(err)
Expand Down
30 changes: 18 additions & 12 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ type Syncer struct {
sync.RWMutex
t time.Time
}
// safeMode is used to track if we need to generate dml with safe-mode
// For each binlog event, we will set the current value into eventContext because
// the status of this track may change over time.
safeMode *sm.SafeMode

timezone *time.Location

Expand Down Expand Up @@ -1578,8 +1582,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// but there are no ways to make `update` idempotent,
// if we start syncer at an early position, database must bear a period of inconsistent state,
// it's eventual consistency.
safeMode := sm.NewSafeMode()
s.enableSafeModeInitializationPhase(tctx, safeMode)
s.safeMode = sm.NewSafeMode()
s.enableSafeModeInitializationPhase(tctx)

closeShardingResync := func() error {
if shardingReSync == nil {
Expand Down Expand Up @@ -1787,7 +1791,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err != nil {
return err
}
if err = safeMode.Add(tctx, -1); err != nil {
if err = s.safeMode.Add(tctx, -1); err != nil {
return err
}
}
Expand All @@ -1802,7 +1806,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
shardingReSync: shardingReSync,
closeShardingResync: closeShardingResync,
traceSource: traceSource,
safeMode: safeMode,
safeMode: s.safeMode.Enable(),
tryReSync: tryReSync,
startTime: startTime,
shardingReSyncCh: &shardingReSyncCh,
Expand Down Expand Up @@ -1880,10 +1884,12 @@ type eventContext struct {
shardingReSync *ShardingReSync
closeShardingResync func() error
traceSource string
safeMode *sm.SafeMode
tryReSync bool
startTime time.Time
shardingReSyncCh *chan *ShardingReSync
// safeMode is the value of syncer.safeMode when process this event
// syncer.safeMode's value may change on the fly, e.g. after event by pass the safeModeExitPoint
safeMode bool
tryReSync bool
startTime time.Time
shardingReSyncCh *chan *ShardingReSync
}

// TODO: Further split into smaller functions and group common arguments into a context struct.
Expand Down Expand Up @@ -2049,7 +2055,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
return err2
}

param.safeMode = ec.safeMode.Enable()
param.safeMode = ec.safeMode
sqls, keys, args, err = s.genInsertSQLs(param, exprFilter)
if err != nil {
return terror.Annotatef(err, "gen insert sqls failed, schema: %s, table: %s", schemaName, tableName)
Expand All @@ -2063,7 +2069,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
return err2
}

param.safeMode = ec.safeMode.Enable()
param.safeMode = ec.safeMode
sqls, keys, args, err = s.genUpdateSQLs(param, oldExprFilter, newExprFilter)
if err != nil {
return terror.Annotatef(err, "gen update sqls failed, schema: %s, table: %s", schemaName, tableName)
Expand Down Expand Up @@ -2425,7 +2431,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
if needShardingHandle {
target, _ := utils.GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
metrics.UnsyncedTableGauge.WithLabelValues(s.cfg.Name, target, s.cfg.SourceID).Set(float64(remain))
err = ec.safeMode.IncrForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group
err = s.safeMode.IncrForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group
if err != nil {
return err
}
Expand All @@ -2441,7 +2447,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
}

ec.tctx.L().Info("source shard group is synced", zap.String("event", "query"), zap.String("source", source), zap.Stringer("start location", startLocation), log.WrapStringerField("end location", ec.currentLocation))
err = ec.safeMode.DescForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try disable safe-mode after sharding group synced
err = s.safeMode.DescForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try disable safe-mode after sharding group synced
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions tests/safe_mode/conf/source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ from:
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
2 changes: 2 additions & 0 deletions tests/safe_mode/conf/source2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ from:
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3307
checker:
check-enable: false

0 comments on commit 11fb5a8

Please sign in to comment.