From 620adbb9094378b8345cc4b8245d540b40a5e69d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 21 Aug 2020 17:25:18 +0800 Subject: [PATCH 01/10] *: use dump unit exit location to leave safe mode --- dm/config/task.go | 3 +++ dumpling/dumpling.go | 29 +++++++++++++++++++++++++++++ syncer/mode.go | 8 ++++++-- syncer/syncer.go | 8 ++++++++ 4 files changed, 46 insertions(+), 2 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index d4f9086419..b9801e8ded 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -239,6 +240,8 @@ type SyncerConfig struct { EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"` DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"` SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"` + // when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping + DumpExitLocation *binlog.Location `yaml:"-" toml:"-" json:"-"` // deprecated, use `ansi-quotes` in top level config instead EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"` } diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index fd007d58e4..c75d8a73a3 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -15,6 +15,7 @@ package dumpling import ( "context" + "database/sql" "os" "strings" "time" @@ -22,6 +23,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -103,6 +105,15 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { err = export.Dump(newCtx, m.dumpConfig) cancel() + if m.dumpConfig.Consistency == "none" { + if location, err := m.recordExitLocation(); err != nil { + m.logger.Error("failed to record exit location", zap.Error(err)) + } else { + m.cfg.SyncerConfig.DumpExitLocation = location + m.logger.Info("successful record exit location", zap.Stringer("location", location)) + } + } + if err != nil { if utils.IsContextCanceledError(err) { m.logger.Info("filter out error caused by user cancel") @@ -267,3 +278,21 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { return dumpConfig, nil } + +// recordExitLocation writes exit location to config, which could be used in sync unit. +// when using inconsistent dump, sync unit should enable safe mode between start location and this location. +func (m *Dumpling) recordExitLocation() (*binlog.Location, error) { + var location binlog.Location + db, err := sql.Open("mysql", m.dumpConfig.GetDSN("")) + if err != nil { + return nil, err + } + defer db.Close() + pos, gset, err := utils.GetMasterStatus(db, m.cfg.Flavor) + if err != nil { + return nil, err + } + location.Position = pos + location.GTIDSet = gset + return &location, nil +} diff --git a/syncer/mode.go b/syncer/mode.go index 551061bd26..27a4db0937 100644 --- a/syncer/mode.go +++ b/syncer/mode.go @@ -26,12 +26,16 @@ import ( func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeMode *sm.SafeMode) { safeMode.Reset(tctx) // in initialization phase, reset first - safeMode.Add(tctx, 1) // try to enable + safeMode.Add(tctx, 1) // enable and will revert 5 minutes later if s.cfg.SafeMode { - safeMode.Add(tctx, 1) // add 1 but should no corresponding -1 + safeMode.Add(tctx, 1) // add 1 but should no corresponding -1, so keeps enabled s.tctx.L().Info("enable safe-mode by config") } + if s.cfg.DumpExitLocation != nil { + safeMode.Add(tctx, 1) // enable and will revert after pass DumpExitLocation + s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.cfg.DumpExitLocation)) + } go func() { defer func() { diff --git a/syncer/syncer.go b/syncer/syncer.go index ea9c2b2622..92d2c1ae44 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1391,6 +1391,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { default: } + // check pass DumpExitLocation and try disable safe mode, but not in sharding or replacing error + if s.cfg.DumpExitLocation != nil && !s.isReplacingErr && shardingReSync == nil { + if binlog.CompareLocation(currentLocation, *s.cfg.DumpExitLocation, s.cfg.EnableGTID) >= 0 { + s.cfg.DumpExitLocation = nil + safeMode.Add(tctx, -1) + } + } + ec := eventContext{ tctx: tctx, header: e.Header, From d615be5a41d9dd7ac7c8e353c238587d77f447e8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 24 Aug 2020 12:06:18 +0800 Subject: [PATCH 02/10] add test --- syncer/syncer_test.go | 177 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 167 insertions(+), 10 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 6361d1b658..abd000503e 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -23,6 +23,8 @@ import ( "time" sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/failpoint" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/binlog" @@ -1136,16 +1138,7 @@ func (s *testSyncerSuite) TestRun(c *C) { c.Assert(err, IsNil) syncer.genRouter() - checkPointMock.ExpectBegin() - checkPointMock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.cfg.MetaSchema)).WillReturnResult(sqlmock.NewResult(1, 1)) - checkPointMock.ExpectCommit() - checkPointMock.ExpectBegin() - checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s`", s.cfg.MetaSchema, cputil.SyncerCheckpoint(s.cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) - checkPointMock.ExpectCommit() - - // mock syncer.checkpoint.Init() function - syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} - syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) + syncer.setupCheckpoint(checkPointDBConn, checkPointMock) syncer.reset() events1 := mockBinlogEvents{ @@ -1324,6 +1317,157 @@ func (s *testSyncerSuite) TestRun(c *C) { } } +func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + checkPointDB, checkPointMock, err := sqlmock.New() + checkPointDBConn, err := checkPointDB.Conn(context.Background()) + c.Assert(err, IsNil) + + testJobs.jobs = testJobs.jobs[:0] + + s.cfg.BAList = &filter.Rules{ + DoDBs: []string{"test_1"}, + DoTables: []*filter.Table{ + {Schema: "test_1", Name: "t_1"}, + }, + } + + syncer := NewSyncer(s.cfg, nil) + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} + syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, + {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) + + syncer.genRouter() + + syncer.setupCheckpoint(checkPointDBConn, checkPointMock) + + syncer.reset() + + events1 := mockBinlogEvents{ + mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"test_1", "create table test_1.t_1(id int primary key, name varchar(24))"}}, + + mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Update, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(2), "b"}, {int32(1), "b"}}}}, + } + + generatedEvents1 := s.generateEvents(events1, c) + // make sure [18] is last event, and use [18]'s position as dumpExitLocation + c.Assert(len(generatedEvents1), Equals, 19) + dumpExitLocation := binlog.NewLocation("") + dumpExitLocation.Position.Pos = generatedEvents1[18].Header.LogPos + syncer.cfg.DumpExitLocation = &dumpExitLocation + + // check after dumpExitLocation, safe mode is turned off + events2 := mockBinlogEvents{ + mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Update, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(2), "b"}, {int32(1), "b"}}}}, + } + generatedEvents2 := s.generateEvents(events2, c) + + generatedEvents := append(generatedEvents1, generatedEvents2...) + + mockStreamerProducer := &MockStreamProducer{generatedEvents} + mockStreamer, err := mockStreamerProducer.generateStreamer(binlog.NewLocation("")) + c.Assert(err, IsNil) + syncer.streamerController = &StreamerController{ + streamerProducer: mockStreamerProducer, + streamer: mockStreamer, + } + + syncer.addJobFunc = syncer.addJobToMemory + + ctx, cancel := context.WithCancel(context.Background()) + resultCh := make(chan pb.ProcessResult) + + // mock get parser + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + + // disable 5-minute safe mode + c.Assert(failpoint.Enable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil) + go syncer.Process(ctx, resultCh) + + expectJobs := []*expectJob{ + // now every ddl job will start with a flush job + { + flush, + "", + nil, + }, { + ddl, + "CREATE DATABASE IF NOT EXISTS `test_1`", + nil, + }, { + flush, + "", + nil, + }, { + ddl, + "CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", + nil, + }, { + insert, + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", + []interface{}{int32(1), "a"}, + }, { + del, + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", + []interface{}{int32(1)}, + }, { + update, + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", + []interface{}{int32(2)}, + }, { + update, + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", + []interface{}{int32(1), "b"}, + }, { + // start from this event, location passes dumpExitLocation and safe mode should exit + insert, + "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", + []interface{}{int32(1), "a"}, + }, { + del, + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", + []interface{}{int32(1)}, + }, { + update, + "UPDATE `test_1`.`t_1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", + []interface{}{int32(1), "b", int32(2)}, + }, + } + + executeSQLAndWait(len(expectJobs)) + c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(0)) + syncer.mockFinishJob(expectJobs) + + testJobs.Lock() + checkJobs(c, testJobs.jobs, expectJobs) + testJobs.jobs = testJobs.jobs[:0] + testJobs.Unlock() + + cancel() + syncer.Close() + c.Assert(syncer.isClosed(), IsTrue) + + if err := mock.ExpectationsWereMet(); err != nil { + c.Errorf("db unfulfilled expectations: %s", err) + } + + if err := checkPointMock.ExpectationsWereMet(); err != nil { + c.Errorf("checkpointDB unfulfilled expectations: %s", err) + } +} + func executeSQLAndWait(expectJobNum int) { for i := 0; i < 10; i++ { time.Sleep(time.Second) @@ -1386,3 +1530,16 @@ func (s *Syncer) addJobToMemory(job *job) error { return nil } + +func (s *Syncer) setupCheckpoint(checkPointDBConn *sql.Conn, checkPointMock sqlmock.Sqlmock) { + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.cfg.MetaSchema)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s`", s.cfg.MetaSchema, cputil.SyncerCheckpoint(s.cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + + // mock syncer.checkpoint.Init() function + s.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} + s.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) +} From 186e8c30a7fff51f156083d3de5efc934bc74d14 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 24 Aug 2020 12:10:41 +0800 Subject: [PATCH 03/10] rename --- syncer/syncer_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index abd000503e..c8f02f96d5 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1138,7 +1138,7 @@ func (s *testSyncerSuite) TestRun(c *C) { c.Assert(err, IsNil) syncer.genRouter() - syncer.setupCheckpoint(checkPointDBConn, checkPointMock) + syncer.setupMockCheckpoint(checkPointDBConn, checkPointMock) syncer.reset() events1 := mockBinlogEvents{ @@ -1344,7 +1344,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { syncer.genRouter() - syncer.setupCheckpoint(checkPointDBConn, checkPointMock) + syncer.setupMockCheckpoint(checkPointDBConn, checkPointMock) syncer.reset() @@ -1531,7 +1531,7 @@ func (s *Syncer) addJobToMemory(job *job) error { return nil } -func (s *Syncer) setupCheckpoint(checkPointDBConn *sql.Conn, checkPointMock sqlmock.Sqlmock) { +func (s *Syncer) setupMockCheckpoint(checkPointDBConn *sql.Conn, checkPointMock sqlmock.Sqlmock) { checkPointMock.ExpectBegin() checkPointMock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.cfg.MetaSchema)).WillReturnResult(sqlmock.NewResult(1, 1)) checkPointMock.ExpectCommit() From 5b4c848c48937890717e15a1a2234e489fbde570 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 24 Aug 2020 17:02:07 +0800 Subject: [PATCH 04/10] fix test --- syncer/syncer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index c8f02f96d5..798e1f2c94 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1466,6 +1466,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { if err := checkPointMock.ExpectationsWereMet(); err != nil { c.Errorf("checkpointDB unfulfilled expectations: %s", err) } + c.Assert(failpoint.Disable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds"), IsNil) } func executeSQLAndWait(expectJobNum int) { From 0aeef4c0ab11253acd198fbd335edbe113045322 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 31 Aug 2020 11:55:02 +0800 Subject: [PATCH 05/10] refine parseMetadata --- dumpling/dumpling.go | 33 +--- go.mod | 2 +- go.sum | 4 +- loader/loader.go | 5 +- pkg/dumpling/utils.go | 168 ++++++++++++++++++ .../utils_test.go} | 20 ++- pkg/utils/mydumper.go | 119 ------------- syncer/checkpoint.go | 33 ++-- 8 files changed, 205 insertions(+), 179 deletions(-) create mode 100644 pkg/dumpling/utils.go rename pkg/{utils/mydumper_test.go => dumpling/utils_test.go} (90%) delete mode 100644 pkg/utils/mydumper.go diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index 0a1cd8de51..6bcd5cee7a 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" - "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -106,15 +105,6 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { err = export.Dump(newCtx, m.dumpConfig) cancel() - if m.dumpConfig.Consistency == "none" { - if location, err := m.recordExitLocation(); err != nil { - m.logger.Error("failed to record exit location", zap.Error(err)) - } else { - m.cfg.SyncerConfig.DumpExitLocation = location - m.logger.Info("successful record exit location", zap.Stringer("location", location)) - } - } - if err != nil { if utils.IsContextCanceledError(err) { m.logger.Info("filter out error caused by user cancel") @@ -267,6 +257,11 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { } } + // record exit position when consistency is none, to support scenarios like Aurora upstream + if dumpConfig.Consistency == "none" { + dumpConfig.PosAfterConnect = true + } + m.logger.Info("create dumpling", zap.Stringer("config", dumpConfig)) if len(ret) > 0 { m.logger.Warn("meeting some unsupported arguments", zap.Strings("argument", ret)) @@ -279,24 +274,6 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { return dumpConfig, nil } -// recordExitLocation writes exit location to config, which could be used in sync unit. -// when using inconsistent dump, sync unit should enable safe mode between start location and this location. -func (m *Dumpling) recordExitLocation() (*binlog.Location, error) { - var location binlog.Location - db, err := sql.Open("mysql", m.dumpConfig.GetDSN("")) - if err != nil { - return nil, err - } - defer db.Close() - pos, gset, err := utils.GetMasterStatus(db, m.cfg.Flavor) - if err != nil { - return nil, err - } - location.Position = pos - location.GTIDSet = gset - return &location, nil -} - // detectAnsiQuotes tries to detect ANSI_QUOTES from upstream. If success, change EnableANSIQuotes in subtask config func (m *Dumpling) detectAnsiQuotes() { db, err := sql.Open("mysql", m.dumpConfig.GetDSN("")) diff --git a/go.mod b/go.mod index 8caa91ec4e..668343fac8 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/mattn/go-colorable v0.1.7 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 - github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d + github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce diff --git a/go.sum b/go.sum index b489b08469..66dbe5c501 100644 --- a/go.sum +++ b/go.sum @@ -511,8 +511,8 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390= github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= -github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d h1:g98WMoYbEf6dCFJ//l4XyyJw4cA8fICfP3F0Q6BF4EM= -github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY= +github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b h1:9z6SWg93iqdKirDr2vhXgD5wh7JjN1SlOEIiKTOtx3I= +github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY= github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg= github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= diff --git a/loader/loader.go b/loader/loader.go index 58c7cc6cbe..ac46ed4ecb 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/dumpling" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -1239,13 +1240,13 @@ func (l *Loader) checkpointID() string { func (l *Loader) getMydumpMetadata() error { metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata") - pos, _, err := utils.ParseMetaData(metafile, l.cfg.Flavor) + loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor) if err != nil { l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err)) return err } - l.metaBinlog.Set(pos.String()) + l.metaBinlog.Set(loc.Position.String()) return nil } diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go new file mode 100644 index 0000000000..a2c3acc450 --- /dev/null +++ b/pkg/dumpling/utils.go @@ -0,0 +1,168 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dumpling + +import ( + "bufio" + "fmt" + "io" + "os" + "strconv" + "strings" + + "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" +) + +// ParseMetaData parses mydumper's output meta file and returns binlog location. +// since v2.0.0, dumpling maybe configured to output master status after connection pool is established, +// we return this location as well. +func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) { + fd, err := os.Open(filename) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } + defer fd.Close() + + pos := mysql.Position{} + gtidStr := "" + pos2 := mysql.Position{} + gtidStr2 := "" + + br := bufio.NewReader(fd) + + parsePosAndGTID := func(pos *mysql.Position, gtid *string) error { + for { + line, err := br.ReadString('\n') + if err != nil { + return err + } + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + switch key { + case "Log": + pos.Name = value + case "Pos": + pos64, err := strconv.ParseUint(value, 10, 32) + if err != nil { + return err + } + pos.Pos = uint32(pos64) + case "GTID": + // multiple GTID sets may cross multiple lines, continue to read them. + following, err := readFollowingGTIDs(br, flavor) + if err != nil { + return err + } + *gtid = value + following + return nil + } + } + } + + for { + line, err := br.ReadString('\n') + if err == io.EOF { + break + } else if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + + if line == "SHOW MASTER STATUS:" { + if err := parsePosAndGTID(&pos, >idStr); err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } + // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 + } else if line == "SHOW SLAVE STATUS:" { + for { + line, err := br.ReadString('\n') + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } + line = strings.TrimSpace(line) + if len(line) == 0 { + break + } + } + } else if line == "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */" { + if err := parsePosAndGTID(&pos2, >idStr2); err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } + } + } + + if len(pos.Name) == 0 || pos.Pos == uint32(0) || len(pos2.Name) == 0 || pos2.Pos == uint32(0) { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + + gset, err := gtid.ParserGTID(flavor, gtidStr) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + loc := &binlog.Location{ + Position: pos, + GTIDSet: gset, + } + gset2, err := gtid.ParserGTID(flavor, gtidStr2) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + loc2 := &binlog.Location{ + Position: pos2, + GTIDSet: gset2, + } + + return loc, loc2, nil +} + +func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) { + var following strings.Builder + for { + line, err := br.ReadString('\n') + if err == io.EOF { + return following.String(), nil // return the previous, not including the last line. + } else if err != nil { + return "", err + } + + line = strings.TrimSpace(line) + if len(line) == 0 { + return following.String(), nil // end with empty line. + } + + end := len(line) + if strings.HasSuffix(line, ",") { + end = len(line) - 1 + } + + // try parse to verify it + _, err = gtid.ParserGTID(flavor, line[:end]) + if err != nil { + return following.String(), nil // return the previous, not including this non-GTID line. + } + + following.WriteString(line) + } +} diff --git a/pkg/utils/mydumper_test.go b/pkg/dumpling/utils_test.go similarity index 90% rename from pkg/utils/mydumper_test.go rename to pkg/dumpling/utils_test.go index adcd2390dc..e8b81d03ba 100644 --- a/pkg/utils/mydumper_test.go +++ b/pkg/dumpling/utils_test.go @@ -11,17 +11,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -package utils +package dumpling import ( "io/ioutil" "os" + "testing" . "github.com/pingcap/check" "github.com/siddontang/go-mysql/mysql" ) -func (t *testUtilsSuite) TestParseMetaData(c *C) { +var _ = Suite(&testSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testSuite struct { +} + +func (t *testSuite) TestParseMetaData(c *C) { f, err := ioutil.TempFile("", "metadata") c.Assert(err, IsNil) defer os.Remove(f.Name()) @@ -109,9 +119,9 @@ Finished dump at: 2020-05-21 18:02:44`, for _, tc := range testCases { err := ioutil.WriteFile(f.Name(), []byte(tc.source), 0644) c.Assert(err, IsNil) - pos, gsetStr, err := ParseMetaData(f.Name(), "mysql") + loc, _, err := ParseMetaData(f.Name(), "mysql") c.Assert(err, IsNil) - c.Assert(pos, DeepEquals, tc.pos) - c.Assert(gsetStr, Equals, tc.gsetStr) + c.Assert(loc.Position, DeepEquals, tc.pos) + c.Assert(loc.GTIDSet, Equals, tc.gsetStr) } } diff --git a/pkg/utils/mydumper.go b/pkg/utils/mydumper.go deleted file mode 100644 index 56615acf4a..0000000000 --- a/pkg/utils/mydumper.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "bufio" - "fmt" - "io" - "os" - "strconv" - "strings" - - "github.com/siddontang/go-mysql/mysql" - - "github.com/pingcap/dm/pkg/gtid" - "github.com/pingcap/dm/pkg/terror" -) - -// ParseMetaData parses mydumper's output meta file and returns binlog position and GTID -func ParseMetaData(filename, flavor string) (mysql.Position, string, error) { - fd, err := os.Open(filename) - if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - defer fd.Close() - - pos := mysql.Position{} - gtid := "" - - br := bufio.NewReader(fd) - for { - line, err := br.ReadString('\n') - if err == io.EOF { - break - } else if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 - if strings.Contains(line, "SHOW SLAVE STATUS") { - // now, we only parse log / pos for `SHOW MASTER STATUS` - break - } - parts := strings.SplitN(line, ":", 2) - if len(parts) != 2 { - continue - } - key := strings.TrimSpace(parts[0]) - value := strings.TrimSpace(parts[1]) - - switch key { - case "Log": - pos.Name = value - case "Pos": - pos64, err := strconv.ParseUint(value, 10, 32) - if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - pos.Pos = uint32(pos64) - case "GTID": - // multiple GTID sets may cross multiple lines, continue to read them. - following, err := readFollowingGTIDs(br, flavor) - if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - gtid = value + following - } - } - - if len(pos.Name) == 0 || pos.Pos == uint32(0) { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) - } - - return pos, gtid, nil -} - -func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) { - var following strings.Builder - for { - line, err := br.ReadString('\n') - if err == io.EOF { - return following.String(), nil // return the previous, not including the last line. - } else if err != nil { - return "", err - } - - line = strings.TrimSpace(line) - if len(line) == 0 { - return following.String(), nil // end with empty line. - } - - end := len(line) - if strings.HasSuffix(line, ",") { - end = len(line) - 1 - } - - // try parse to verify it - _, err = gtid.ParserGTID(flavor, line[:end]) - if err != nil { - return following.String(), nil // return the previous, not including this non-GTID line. - } - - following.WriteString(line) - } -} diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index bc8de2b775..8179e91463 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -22,23 +22,23 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser/model" + tmysql "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/siddontang/go-mysql/mysql" + "go.uber.org/zap" + "github.com/pingcap/dm/dm/config" - binlog "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/dumpling" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/schema" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" - "github.com/pingcap/tidb-tools/pkg/dbutil" - - "github.com/pingcap/failpoint" - "github.com/pingcap/parser/model" - tmysql "github.com/pingcap/parser/mysql" - "github.com/siddontang/go-mysql/mysql" - "go.uber.org/zap" ) /* @@ -809,18 +809,7 @@ func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, error) { // `metadata` is mydumper's output meta file name filename := path.Join(cp.cfg.Dir, "metadata") cp.logCtx.L().Info("parsing metadata from file", zap.String("file", filename)) - pos, gsetStr, err := utils.ParseMetaData(filename, cp.cfg.Flavor) - if err != nil { - return nil, err - } - - gset, err := gtid.ParserGTID(cp.cfg.Flavor, gsetStr) - if err != nil { - return nil, err - } + loc, _, err := dumpling.ParseMetaData(filename, cp.cfg.Flavor) - return &binlog.Location{ - Position: pos, - GTIDSet: gset, - }, nil + return loc, err } From 8ce7cf9ec73855d92beda3f4e82e2d3ed3a13cbb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 31 Aug 2020 12:08:27 +0800 Subject: [PATCH 06/10] fix test --- pkg/dumpling/utils.go | 39 +++++++++++++++++++++++++------------- pkg/dumpling/utils_test.go | 5 ++++- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go index a2c3acc450..8c927256b5 100644 --- a/pkg/dumpling/utils.go +++ b/pkg/dumpling/utils.go @@ -38,10 +38,16 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, } defer fd.Close() - pos := mysql.Position{} - gtidStr := "" - pos2 := mysql.Position{} - gtidStr2 := "" + var ( + pos mysql.Position + gtidStr string + useLocation2 = false + pos2 mysql.Position + gtidStr2 string + + loc *binlog.Location + loc2 *binlog.Location + ) br := bufio.NewReader(fd) @@ -107,13 +113,14 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, } } } else if line == "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */" { + useLocation2 = true if err := parsePosAndGTID(&pos2, >idStr2); err != nil { return nil, nil, terror.ErrParseMydumperMeta.Generate(err) } } } - if len(pos.Name) == 0 || pos.Pos == uint32(0) || len(pos2.Name) == 0 || pos2.Pos == uint32(0) { + if len(pos.Name) == 0 || pos.Pos == uint32(0) { return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) } @@ -121,17 +128,23 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, if err != nil { return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) } - loc := &binlog.Location{ + loc = &binlog.Location{ Position: pos, GTIDSet: gset, } - gset2, err := gtid.ParserGTID(flavor, gtidStr2) - if err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) - } - loc2 := &binlog.Location{ - Position: pos2, - GTIDSet: gset2, + + if useLocation2 { + if len(pos2.Name) == 0 || pos2.Pos == uint32(0) { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + gset2, err := gtid.ParserGTID(flavor, gtidStr2) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + loc2 = &binlog.Location{ + Position: pos2, + GTIDSet: gset2, + } } return loc, loc2, nil diff --git a/pkg/dumpling/utils_test.go b/pkg/dumpling/utils_test.go index e8b81d03ba..921ecadd75 100644 --- a/pkg/dumpling/utils_test.go +++ b/pkg/dumpling/utils_test.go @@ -20,6 +20,8 @@ import ( . "github.com/pingcap/check" "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/pkg/gtid" ) var _ = Suite(&testSuite{}) @@ -122,6 +124,7 @@ Finished dump at: 2020-05-21 18:02:44`, loc, _, err := ParseMetaData(f.Name(), "mysql") c.Assert(err, IsNil) c.Assert(loc.Position, DeepEquals, tc.pos) - c.Assert(loc.GTIDSet, Equals, tc.gsetStr) + gs, _ := gtid.ParserGTID("mysql", tc.gsetStr) + c.Assert(loc.GTIDSet, DeepEquals, gs) } } From 618cdef0e19b58c6215b0b26eb615daee961f328 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 31 Aug 2020 14:38:37 +0800 Subject: [PATCH 07/10] add test --- pkg/dumpling/utils.go | 9 ++-- pkg/dumpling/utils_test.go | 104 +++++++++++++++++++++++++++++++++++-- syncer/checkpoint.go | 17 +++--- 3 files changed, 117 insertions(+), 13 deletions(-) diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go index 8c927256b5..1a93df3bd6 100644 --- a/pkg/dumpling/utils.go +++ b/pkg/dumpling/utils.go @@ -96,12 +96,13 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, continue } - if line == "SHOW MASTER STATUS:" { + switch line { + case "SHOW MASTER STATUS:": if err := parsePosAndGTID(&pos, >idStr); err != nil { return nil, nil, terror.ErrParseMydumperMeta.Generate(err) } + case "SHOW SLAVE STATUS:": // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 - } else if line == "SHOW SLAVE STATUS:" { for { line, err := br.ReadString('\n') if err != nil { @@ -112,11 +113,13 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, break } } - } else if line == "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */" { + case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */": useLocation2 = true if err := parsePosAndGTID(&pos2, >idStr2); err != nil { return nil, nil, terror.ErrParseMydumperMeta.Generate(err) } + default: + // do nothing for Started dump, Finished dump... } } diff --git a/pkg/dumpling/utils_test.go b/pkg/dumpling/utils_test.go index 921ecadd75..245f96266f 100644 --- a/pkg/dumpling/utils_test.go +++ b/pkg/dumpling/utils_test.go @@ -39,9 +39,12 @@ func (t *testSuite) TestParseMetaData(c *C) { defer os.Remove(f.Name()) testCases := []struct { - source string - pos mysql.Position - gsetStr string + source string + pos mysql.Position + gsetStr string + loc2 bool + pos2 mysql.Position + gsetStr2 string }{ { `Started dump at: 2018-12-28 07:20:49 @@ -56,6 +59,9 @@ Finished dump at: 2018-12-28 07:20:51`, Pos: 2479, }, "97b5142f-e19c-11e8-808c-0242ac110005:1-13", + false, + mysql.Position{}, + "", }, { `Started dump at: 2018-12-27 19:51:22 @@ -76,6 +82,9 @@ Finished dump at: 2018-12-27 19:51:22`, Pos: 3295817, }, "", + false, + mysql.Position{}, + "", }, { // with empty line after multiple GTID sets `Started dump at: 2020-05-21 18:14:49 @@ -100,6 +109,9 @@ Finished dump at: 2020-05-21 18:14:49`, Pos: 1274, }, "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + false, + mysql.Position{}, + "", }, { // without empty line after mutlple GTID sets `Started dump at: 2020-05-21 18:02:33 @@ -115,16 +127,100 @@ Finished dump at: 2020-05-21 18:02:44`, Pos: 1274, }, "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + false, + mysql.Position{}, + "", + }, + { // with empty line after multiple GTID sets + `Started dump at: 2020-05-21 18:14:49 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 1274 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW SLAVE STATUS: + Host: 192.168.100.100 + Log: mysql-bin.000003 + Pos: 700 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ + Log: mysql-bin.000003 + Pos: 1280 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-14 + +Finished dump at: 2020-05-21 18:14:49`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1274, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + true, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1280, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-14", + }, + { // with empty line after multiple GTID sets + `Started dump at: 2020-05-21 18:14:49 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 1274 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW SLAVE STATUS: + Host: 192.168.100.100 + Log: mysql-bin.000003 + Pos: 700 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ + Log: mysql-bin.000004 + Pos: 4 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-9, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +Finished dump at: 2020-05-21 18:14:49`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1274, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + true, + mysql.Position{ + Name: "mysql-bin.000004", + Pos: 4, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-9,97b5142f-e19c-11e8-808c-0242ac110005:1-13", }, } for _, tc := range testCases { err := ioutil.WriteFile(f.Name(), []byte(tc.source), 0644) c.Assert(err, IsNil) - loc, _, err := ParseMetaData(f.Name(), "mysql") + loc, loc2, err := ParseMetaData(f.Name(), "mysql") c.Assert(err, IsNil) c.Assert(loc.Position, DeepEquals, tc.pos) gs, _ := gtid.ParserGTID("mysql", tc.gsetStr) c.Assert(loc.GTIDSet, DeepEquals, gs) + if tc.loc2 { + c.Assert(loc2.Position, DeepEquals, tc.pos2) + gs2, _ := gtid.ParserGTID("mysql", tc.gsetStr2) + c.Assert(loc2.GTIDSet, DeepEquals, gs2) + } else { + c.Assert(loc2, IsNil) + } } } diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 8179e91463..c37ce45e9f 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -733,14 +733,15 @@ func (cp *RemoteCheckPoint) LoadMeta() error { defer cp.Unlock() var ( - location *binlog.Location - err error + location *binlog.Location + dumpExitLoc *binlog.Location + err error ) switch cp.cfg.Mode { case config.ModeAll: // NOTE: syncer must continue the syncing follow loader's tail, so we parse mydumper's output // refine when master / slave switching added and checkpoint mechanism refactored - location, err = cp.parseMetaData() + location, dumpExitLoc, err = cp.parseMetaData() if err != nil { return err } @@ -773,6 +774,10 @@ func (cp *RemoteCheckPoint) LoadMeta() error { cp.globalPoint = newBinlogPoint(location.Clone(), location.Clone(), nil, nil, cp.cfg.EnableGTID) cp.logCtx.L().Info("loaded checkpoints from meta", log.WrapStringerField("global checkpoint", cp.globalPoint)) } + if dumpExitLoc != nil { + cp.cfg.DumpExitLocation = dumpExitLoc + cp.logCtx.L().Info("set DumpExitLocation from meta", zap.Stringer("DumpExitLocation", dumpExitLoc)) + } return nil } @@ -805,11 +810,11 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl return sql2, args } -func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, error) { +func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, *binlog.Location, error) { // `metadata` is mydumper's output meta file name filename := path.Join(cp.cfg.Dir, "metadata") cp.logCtx.L().Info("parsing metadata from file", zap.String("file", filename)) - loc, _, err := dumpling.ParseMetaData(filename, cp.cfg.Flavor) + loc, dumpExitLoc, err := dumpling.ParseMetaData(filename, cp.cfg.Flavor) - return loc, err + return loc, dumpExitLoc, err } From c13db8a11ce805072f524f20fa2b9b8c3e178e33 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 31 Aug 2020 15:45:31 +0800 Subject: [PATCH 08/10] add it --- syncer/syncer.go | 2 +- tests/safe_mode/run.sh | 52 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 92d2c1ae44..536156fe6e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1094,7 +1094,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if s.cfg.Mode == config.ModeAll { if err = s.flushCheckPoints(); err != nil { s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) - } else { + } else if s.cfg.CleanDumpFile { s.tctx.L().Info("try to remove loaded files") metadataFile := path.Join(s.cfg.Dir, "metadata") if err = os.Remove(metadataFile); err != nil { diff --git a/tests/safe_mode/run.sh b/tests/safe_mode/run.sh index e332c4db9c..68b3ba22ed 100755 --- a/tests/safe_mode/run.sh +++ b/tests/safe_mode/run.sh @@ -6,7 +6,59 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +function consistency_none() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i "/timezone/i\clean-dump-file: false" $WORK_DIR/dm-task.yaml + sed -i "s/extra-args: \"\"/extra-args: \"--consistency none\"/g" $WORK_DIR/dm-task.yaml + dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # make sure dumpling's metadata added empty line after two SHOW MASTER STATUS + empty_line=`grep -cvE '\S' $WORK_DIR/worker1/dumped_data.test/metadata` + if [ $empty_line -ne 2 ]; then + echo "wrong number of empty line in dumpling's metadata" + exit 1 + fi + empty_line=`grep -cvE '\S' $WORK_DIR/worker2/dumped_data.test/metadata` + if [ $empty_line -ne 2 ]; then + echo "wrong number of empty line in dumpling's metadata" + exit 1 + fi + + name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.test/metadata|tail -1|awk -F: '{print $2,":",$3}'|tr -d ' ') + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name1, $pos1), gtid-set: $gtid1\"\]" + name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.test/metadata|tail -1|awk -F: '{print $2,":",$3}'|tr -d ' ') + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name2, $pos2), gtid-set: $gtid2\"\]" + + cleanup_data safe_mode_target + cleanup_process $* +} + function run() { + consistency_none + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 From 3aa0d9b77babdb5f7be9404a47f1179573e592fb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 2 Sep 2020 14:39:27 +0800 Subject: [PATCH 09/10] fix shadow --- pkg/dumpling/utils.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go index 1a93df3bd6..efe8f7e25c 100644 --- a/pkg/dumpling/utils.go +++ b/pkg/dumpling/utils.go @@ -53,9 +53,9 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, parsePosAndGTID := func(pos *mysql.Position, gtid *string) error { for { - line, err := br.ReadString('\n') - if err != nil { - return err + line, err2 := br.ReadString('\n') + if err2 != nil { + return err2 } parts := strings.SplitN(line, ":", 2) if len(parts) != 2 { @@ -67,16 +67,16 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, case "Log": pos.Name = value case "Pos": - pos64, err := strconv.ParseUint(value, 10, 32) - if err != nil { - return err + pos64, err3 := strconv.ParseUint(value, 10, 32) + if err3 != nil { + return err3 } pos.Pos = uint32(pos64) case "GTID": // multiple GTID sets may cross multiple lines, continue to read them. - following, err := readFollowingGTIDs(br, flavor) - if err != nil { - return err + following, err3 := readFollowingGTIDs(br, flavor) + if err3 != nil { + return err3 } *gtid = value + following return nil @@ -85,11 +85,11 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, } for { - line, err := br.ReadString('\n') - if err == io.EOF { + line, err2 := br.ReadString('\n') + if err2 == io.EOF { break - } else if err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } else if err2 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err2) } line = strings.TrimSpace(line) if len(line) == 0 { @@ -98,15 +98,15 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, switch line { case "SHOW MASTER STATUS:": - if err := parsePosAndGTID(&pos, >idStr); err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + if err3 := parsePosAndGTID(&pos, >idStr); err3 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) } case "SHOW SLAVE STATUS:": // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 for { - line, err := br.ReadString('\n') - if err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + line, err3 := br.ReadString('\n') + if err3 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) } line = strings.TrimSpace(line) if len(line) == 0 { @@ -115,8 +115,8 @@ func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, } case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */": useLocation2 = true - if err := parsePosAndGTID(&pos2, >idStr2); err != nil { - return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + if err3 := parsePosAndGTID(&pos2, >idStr2); err3 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) } default: // do nothing for Started dump, Finished dump... From 16270b591ca3ff2cb491a68759203d19b24a1d97 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 2 Sep 2020 14:45:03 +0800 Subject: [PATCH 10/10] address comments --- dm/config/task.go | 2 +- syncer/checkpoint.go | 17 ++++++++--------- syncer/mode.go | 6 +++--- syncer/syncer.go | 8 ++++---- syncer/syncer_test.go | 12 ++++++------ 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 8503ccd8b5..b488e53c88 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -241,7 +241,7 @@ type SyncerConfig struct { DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"` SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"` // when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping - DumpExitLocation *binlog.Location `yaml:"-" toml:"-" json:"-"` + SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"` // deprecated, use `ansi-quotes` in top level config instead EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"` } diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 9fb362c664..ed83a25790 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -733,15 +733,15 @@ func (cp *RemoteCheckPoint) LoadMeta() error { defer cp.Unlock() var ( - location *binlog.Location - dumpExitLoc *binlog.Location - err error + location *binlog.Location + safeModeExitLoc *binlog.Location + err error ) switch cp.cfg.Mode { case config.ModeAll: // NOTE: syncer must continue the syncing follow loader's tail, so we parse mydumper's output // refine when master / slave switching added and checkpoint mechanism refactored - location, dumpExitLoc, err = cp.parseMetaData() + location, safeModeExitLoc, err = cp.parseMetaData() if err != nil { return err } @@ -774,9 +774,9 @@ func (cp *RemoteCheckPoint) LoadMeta() error { cp.globalPoint = newBinlogPoint(location.Clone(), location.Clone(), nil, nil, cp.cfg.EnableGTID) cp.logCtx.L().Info("loaded checkpoints from meta", log.WrapStringerField("global checkpoint", cp.globalPoint)) } - if dumpExitLoc != nil { - cp.cfg.DumpExitLocation = dumpExitLoc - cp.logCtx.L().Info("set DumpExitLocation from meta", zap.Stringer("DumpExitLocation", dumpExitLoc)) + if safeModeExitLoc != nil { + cp.cfg.SafeModeExitLoc = safeModeExitLoc + cp.logCtx.L().Info("set SafeModeExitLoc from meta", zap.Stringer("SafeModeExitLoc", safeModeExitLoc)) } return nil @@ -814,7 +814,6 @@ func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, *binlog.Location, // `metadata` is mydumper's output meta file name filename := path.Join(cp.cfg.Dir, "metadata") cp.logCtx.L().Info("parsing metadata from file", zap.String("file", filename)) - loc, dumpExitLoc, err := dumpling.ParseMetaData(filename, cp.cfg.Flavor) - return loc, dumpExitLoc, err + return dumpling.ParseMetaData(filename, cp.cfg.Flavor) } diff --git a/syncer/mode.go b/syncer/mode.go index 27a4db0937..d3fd5e144c 100644 --- a/syncer/mode.go +++ b/syncer/mode.go @@ -32,9 +32,9 @@ func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeM safeMode.Add(tctx, 1) // add 1 but should no corresponding -1, so keeps enabled s.tctx.L().Info("enable safe-mode by config") } - if s.cfg.DumpExitLocation != nil { - safeMode.Add(tctx, 1) // enable and will revert after pass DumpExitLocation - s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.cfg.DumpExitLocation)) + if s.cfg.SafeModeExitLoc != nil { + safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc + s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.cfg.SafeModeExitLoc)) } go func() { diff --git a/syncer/syncer.go b/syncer/syncer.go index 536156fe6e..fbc6d188ee 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1391,10 +1391,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { default: } - // check pass DumpExitLocation and try disable safe mode, but not in sharding or replacing error - if s.cfg.DumpExitLocation != nil && !s.isReplacingErr && shardingReSync == nil { - if binlog.CompareLocation(currentLocation, *s.cfg.DumpExitLocation, s.cfg.EnableGTID) >= 0 { - s.cfg.DumpExitLocation = nil + // check pass SafeModeExitLoc and try disable safe mode, but not in sharding or replacing error + if s.cfg.SafeModeExitLoc != nil && !s.isReplacingErr && shardingReSync == nil { + if binlog.CompareLocation(currentLocation, *s.cfg.SafeModeExitLoc, s.cfg.EnableGTID) >= 0 { + s.cfg.SafeModeExitLoc = nil safeMode.Add(tctx, -1) } } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 9442b52278..ef8c42c41f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1360,13 +1360,13 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { } generatedEvents1 := s.generateEvents(events1, c) - // make sure [18] is last event, and use [18]'s position as dumpExitLocation + // make sure [18] is last event, and use [18]'s position as safeModeExitLocation c.Assert(len(generatedEvents1), Equals, 19) - dumpExitLocation := binlog.NewLocation("") - dumpExitLocation.Position.Pos = generatedEvents1[18].Header.LogPos - syncer.cfg.DumpExitLocation = &dumpExitLocation + safeModeExitLocation := binlog.NewLocation("") + safeModeExitLocation.Position.Pos = generatedEvents1[18].Header.LogPos + syncer.cfg.SafeModeExitLoc = &safeModeExitLocation - // check after dumpExitLocation, safe mode is turned off + // check after safeModeExitLocation, safe mode is turned off events2 := mockBinlogEvents{ mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, @@ -1433,7 +1433,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int32(1), "b"}, }, { - // start from this event, location passes dumpExitLocation and safe mode should exit + // start from this event, location passes safeModeExitLocation and safe mode should exit insert, "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int32(1), "a"},