From b16f201a2955c136e307b147a98c55476eb149e9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 3 Sep 2020 10:38:01 +0800 Subject: [PATCH] *: use dumpling's finish building connection location to leave safe mode (#915) --- dm/config/task.go | 3 + dumpling/dumpling.go | 5 + go.mod | 2 +- go.sum | 4 +- loader/loader.go | 5 +- pkg/dumpling/utils.go | 184 ++++++++++++++++++++++++++++++ pkg/dumpling/utils_test.go | 226 +++++++++++++++++++++++++++++++++++++ pkg/utils/mydumper.go | 119 ------------------- pkg/utils/mydumper_test.go | 117 ------------------- syncer/checkpoint.go | 31 ++--- syncer/mode.go | 8 +- syncer/syncer.go | 10 +- syncer/syncer_test.go | 178 +++++++++++++++++++++++++++-- tests/safe_mode/run.sh | 52 +++++++++ 14 files changed, 671 insertions(+), 273 deletions(-) create mode 100644 pkg/dumpling/utils.go create mode 100644 pkg/dumpling/utils_test.go delete mode 100644 pkg/utils/mydumper.go delete mode 100644 pkg/utils/mydumper_test.go diff --git a/dm/config/task.go b/dm/config/task.go index bd2eedc4b8..b488e53c88 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -239,6 +240,8 @@ type SyncerConfig struct { EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"` DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"` SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"` + // when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping + SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"` // deprecated, use `ansi-quotes` in top level config instead EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"` } diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index e835f974d8..6bcd5cee7a 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -257,6 +257,11 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { } } + // record exit position when consistency is none, to support scenarios like Aurora upstream + if dumpConfig.Consistency == "none" { + dumpConfig.PosAfterConnect = true + } + m.logger.Info("create dumpling", zap.Stringer("config", dumpConfig)) if len(ret) > 0 { m.logger.Warn("meeting some unsupported arguments", zap.Strings("argument", ret)) diff --git a/go.mod b/go.mod index 8caa91ec4e..668343fac8 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/mattn/go-colorable v0.1.7 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 - github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d + github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce diff --git a/go.sum b/go.sum index b489b08469..66dbe5c501 100644 --- a/go.sum +++ b/go.sum @@ -511,8 +511,8 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390= github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= -github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d h1:g98WMoYbEf6dCFJ//l4XyyJw4cA8fICfP3F0Q6BF4EM= -github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY= +github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b h1:9z6SWg93iqdKirDr2vhXgD5wh7JjN1SlOEIiKTOtx3I= +github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY= github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg= github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= diff --git a/loader/loader.go b/loader/loader.go index 58c7cc6cbe..ac46ed4ecb 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/dumpling" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -1239,13 +1240,13 @@ func (l *Loader) checkpointID() string { func (l *Loader) getMydumpMetadata() error { metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata") - pos, _, err := utils.ParseMetaData(metafile, l.cfg.Flavor) + loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor) if err != nil { l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err)) return err } - l.metaBinlog.Set(pos.String()) + l.metaBinlog.Set(loc.Position.String()) return nil } diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go new file mode 100644 index 0000000000..efe8f7e25c --- /dev/null +++ b/pkg/dumpling/utils.go @@ -0,0 +1,184 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dumpling + +import ( + "bufio" + "fmt" + "io" + "os" + "strconv" + "strings" + + "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" +) + +// ParseMetaData parses mydumper's output meta file and returns binlog location. +// since v2.0.0, dumpling maybe configured to output master status after connection pool is established, +// we return this location as well. +func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) { + fd, err := os.Open(filename) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err) + } + defer fd.Close() + + var ( + pos mysql.Position + gtidStr string + useLocation2 = false + pos2 mysql.Position + gtidStr2 string + + loc *binlog.Location + loc2 *binlog.Location + ) + + br := bufio.NewReader(fd) + + parsePosAndGTID := func(pos *mysql.Position, gtid *string) error { + for { + line, err2 := br.ReadString('\n') + if err2 != nil { + return err2 + } + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + switch key { + case "Log": + pos.Name = value + case "Pos": + pos64, err3 := strconv.ParseUint(value, 10, 32) + if err3 != nil { + return err3 + } + pos.Pos = uint32(pos64) + case "GTID": + // multiple GTID sets may cross multiple lines, continue to read them. + following, err3 := readFollowingGTIDs(br, flavor) + if err3 != nil { + return err3 + } + *gtid = value + following + return nil + } + } + } + + for { + line, err2 := br.ReadString('\n') + if err2 == io.EOF { + break + } else if err2 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err2) + } + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + + switch line { + case "SHOW MASTER STATUS:": + if err3 := parsePosAndGTID(&pos, >idStr); err3 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) + } + case "SHOW SLAVE STATUS:": + // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 + for { + line, err3 := br.ReadString('\n') + if err3 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) + } + line = strings.TrimSpace(line) + if len(line) == 0 { + break + } + } + case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */": + useLocation2 = true + if err3 := parsePosAndGTID(&pos2, >idStr2); err3 != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(err3) + } + default: + // do nothing for Started dump, Finished dump... + } + } + + if len(pos.Name) == 0 || pos.Pos == uint32(0) { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + + gset, err := gtid.ParserGTID(flavor, gtidStr) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + loc = &binlog.Location{ + Position: pos, + GTIDSet: gset, + } + + if useLocation2 { + if len(pos2.Name) == 0 || pos2.Pos == uint32(0) { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + gset2, err := gtid.ParserGTID(flavor, gtidStr2) + if err != nil { + return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) + } + loc2 = &binlog.Location{ + Position: pos2, + GTIDSet: gset2, + } + } + + return loc, loc2, nil +} + +func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) { + var following strings.Builder + for { + line, err := br.ReadString('\n') + if err == io.EOF { + return following.String(), nil // return the previous, not including the last line. + } else if err != nil { + return "", err + } + + line = strings.TrimSpace(line) + if len(line) == 0 { + return following.String(), nil // end with empty line. + } + + end := len(line) + if strings.HasSuffix(line, ",") { + end = len(line) - 1 + } + + // try parse to verify it + _, err = gtid.ParserGTID(flavor, line[:end]) + if err != nil { + return following.String(), nil // return the previous, not including this non-GTID line. + } + + following.WriteString(line) + } +} diff --git a/pkg/dumpling/utils_test.go b/pkg/dumpling/utils_test.go new file mode 100644 index 0000000000..245f96266f --- /dev/null +++ b/pkg/dumpling/utils_test.go @@ -0,0 +1,226 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dumpling + +import ( + "io/ioutil" + "os" + "testing" + + . "github.com/pingcap/check" + "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/pkg/gtid" +) + +var _ = Suite(&testSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testSuite struct { +} + +func (t *testSuite) TestParseMetaData(c *C) { + f, err := ioutil.TempFile("", "metadata") + c.Assert(err, IsNil) + defer os.Remove(f.Name()) + + testCases := []struct { + source string + pos mysql.Position + gsetStr string + loc2 bool + pos2 mysql.Position + gsetStr2 string + }{ + { + `Started dump at: 2018-12-28 07:20:49 +SHOW MASTER STATUS: + Log: bin.000001 + Pos: 2479 + GTID:97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +Finished dump at: 2018-12-28 07:20:51`, + mysql.Position{ + Name: "bin.000001", + Pos: 2479, + }, + "97b5142f-e19c-11e8-808c-0242ac110005:1-13", + false, + mysql.Position{}, + "", + }, + { + `Started dump at: 2018-12-27 19:51:22 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 3295817 + GTID: + +SHOW SLAVE STATUS: + Host: 10.128.27.98 + Log: mysql-bin.000003 + Pos: 329635 + GTID: + +Finished dump at: 2018-12-27 19:51:22`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 3295817, + }, + "", + false, + mysql.Position{}, + "", + }, + { // with empty line after multiple GTID sets + `Started dump at: 2020-05-21 18:14:49 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 1274 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW SLAVE STATUS: + Host: 192.168.100.100 + Log: mysql-bin.000003 + Pos: 700 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +Finished dump at: 2020-05-21 18:14:49`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1274, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + false, + mysql.Position{}, + "", + }, + { // without empty line after mutlple GTID sets + `Started dump at: 2020-05-21 18:02:33 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 1274 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 +Finished dump at: 2020-05-21 18:02:44`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1274, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + false, + mysql.Position{}, + "", + }, + { // with empty line after multiple GTID sets + `Started dump at: 2020-05-21 18:14:49 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 1274 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW SLAVE STATUS: + Host: 192.168.100.100 + Log: mysql-bin.000003 + Pos: 700 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ + Log: mysql-bin.000003 + Pos: 1280 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-14 + +Finished dump at: 2020-05-21 18:14:49`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1274, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + true, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1280, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-14", + }, + { // with empty line after multiple GTID sets + `Started dump at: 2020-05-21 18:14:49 +SHOW MASTER STATUS: + Log: mysql-bin.000003 + Pos: 1274 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW SLAVE STATUS: + Host: 192.168.100.100 + Log: mysql-bin.000003 + Pos: 700 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-7, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ + Log: mysql-bin.000004 + Pos: 4 + GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, +5b642cb6-9b43-11ea-8914-0242ac170003:1-9, +97b5142f-e19c-11e8-808c-0242ac110005:1-13 + +Finished dump at: 2020-05-21 18:14:49`, + mysql.Position{ + Name: "mysql-bin.000003", + Pos: 1274, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + true, + mysql.Position{ + Name: "mysql-bin.000004", + Pos: 4, + }, + "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-9,97b5142f-e19c-11e8-808c-0242ac110005:1-13", + }, + } + + for _, tc := range testCases { + err := ioutil.WriteFile(f.Name(), []byte(tc.source), 0644) + c.Assert(err, IsNil) + loc, loc2, err := ParseMetaData(f.Name(), "mysql") + c.Assert(err, IsNil) + c.Assert(loc.Position, DeepEquals, tc.pos) + gs, _ := gtid.ParserGTID("mysql", tc.gsetStr) + c.Assert(loc.GTIDSet, DeepEquals, gs) + if tc.loc2 { + c.Assert(loc2.Position, DeepEquals, tc.pos2) + gs2, _ := gtid.ParserGTID("mysql", tc.gsetStr2) + c.Assert(loc2.GTIDSet, DeepEquals, gs2) + } else { + c.Assert(loc2, IsNil) + } + } +} diff --git a/pkg/utils/mydumper.go b/pkg/utils/mydumper.go deleted file mode 100644 index 56615acf4a..0000000000 --- a/pkg/utils/mydumper.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "bufio" - "fmt" - "io" - "os" - "strconv" - "strings" - - "github.com/siddontang/go-mysql/mysql" - - "github.com/pingcap/dm/pkg/gtid" - "github.com/pingcap/dm/pkg/terror" -) - -// ParseMetaData parses mydumper's output meta file and returns binlog position and GTID -func ParseMetaData(filename, flavor string) (mysql.Position, string, error) { - fd, err := os.Open(filename) - if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - defer fd.Close() - - pos := mysql.Position{} - gtid := "" - - br := bufio.NewReader(fd) - for { - line, err := br.ReadString('\n') - if err == io.EOF { - break - } else if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - // ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434 - if strings.Contains(line, "SHOW SLAVE STATUS") { - // now, we only parse log / pos for `SHOW MASTER STATUS` - break - } - parts := strings.SplitN(line, ":", 2) - if len(parts) != 2 { - continue - } - key := strings.TrimSpace(parts[0]) - value := strings.TrimSpace(parts[1]) - - switch key { - case "Log": - pos.Name = value - case "Pos": - pos64, err := strconv.ParseUint(value, 10, 32) - if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - pos.Pos = uint32(pos64) - case "GTID": - // multiple GTID sets may cross multiple lines, continue to read them. - following, err := readFollowingGTIDs(br, flavor) - if err != nil { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(err) - } - gtid = value + following - } - } - - if len(pos.Name) == 0 || pos.Pos == uint32(0) { - return mysql.Position{}, "", terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename)) - } - - return pos, gtid, nil -} - -func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) { - var following strings.Builder - for { - line, err := br.ReadString('\n') - if err == io.EOF { - return following.String(), nil // return the previous, not including the last line. - } else if err != nil { - return "", err - } - - line = strings.TrimSpace(line) - if len(line) == 0 { - return following.String(), nil // end with empty line. - } - - end := len(line) - if strings.HasSuffix(line, ",") { - end = len(line) - 1 - } - - // try parse to verify it - _, err = gtid.ParserGTID(flavor, line[:end]) - if err != nil { - return following.String(), nil // return the previous, not including this non-GTID line. - } - - following.WriteString(line) - } -} diff --git a/pkg/utils/mydumper_test.go b/pkg/utils/mydumper_test.go deleted file mode 100644 index adcd2390dc..0000000000 --- a/pkg/utils/mydumper_test.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "io/ioutil" - "os" - - . "github.com/pingcap/check" - "github.com/siddontang/go-mysql/mysql" -) - -func (t *testUtilsSuite) TestParseMetaData(c *C) { - f, err := ioutil.TempFile("", "metadata") - c.Assert(err, IsNil) - defer os.Remove(f.Name()) - - testCases := []struct { - source string - pos mysql.Position - gsetStr string - }{ - { - `Started dump at: 2018-12-28 07:20:49 -SHOW MASTER STATUS: - Log: bin.000001 - Pos: 2479 - GTID:97b5142f-e19c-11e8-808c-0242ac110005:1-13 - -Finished dump at: 2018-12-28 07:20:51`, - mysql.Position{ - Name: "bin.000001", - Pos: 2479, - }, - "97b5142f-e19c-11e8-808c-0242ac110005:1-13", - }, - { - `Started dump at: 2018-12-27 19:51:22 -SHOW MASTER STATUS: - Log: mysql-bin.000003 - Pos: 3295817 - GTID: - -SHOW SLAVE STATUS: - Host: 10.128.27.98 - Log: mysql-bin.000003 - Pos: 329635 - GTID: - -Finished dump at: 2018-12-27 19:51:22`, - mysql.Position{ - Name: "mysql-bin.000003", - Pos: 3295817, - }, - "", - }, - { // with empty line after multiple GTID sets - `Started dump at: 2020-05-21 18:14:49 -SHOW MASTER STATUS: - Log: mysql-bin.000003 - Pos: 1274 - GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, -5b642cb6-9b43-11ea-8914-0242ac170003:1-7, -97b5142f-e19c-11e8-808c-0242ac110005:1-13 - -SHOW SLAVE STATUS: - Host: 192.168.100.100 - Log: mysql-bin.000003 - Pos: 700 - GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, -5b642cb6-9b43-11ea-8914-0242ac170003:1-7, -97b5142f-e19c-11e8-808c-0242ac110005:1-13 - -Finished dump at: 2020-05-21 18:14:49`, - mysql.Position{ - Name: "mysql-bin.000003", - Pos: 1274, - }, - "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", - }, - { // without empty line after mutlple GTID sets - `Started dump at: 2020-05-21 18:02:33 -SHOW MASTER STATUS: - Log: mysql-bin.000003 - Pos: 1274 - GTID:5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10, -5b642cb6-9b43-11ea-8914-0242ac170003:1-7, -97b5142f-e19c-11e8-808c-0242ac110005:1-13 -Finished dump at: 2020-05-21 18:02:44`, - mysql.Position{ - Name: "mysql-bin.000003", - Pos: 1274, - }, - "5b5a8e4e-9b43-11ea-900d-0242ac170002:1-10,5b642cb6-9b43-11ea-8914-0242ac170003:1-7,97b5142f-e19c-11e8-808c-0242ac110005:1-13", - }, - } - - for _, tc := range testCases { - err := ioutil.WriteFile(f.Name(), []byte(tc.source), 0644) - c.Assert(err, IsNil) - pos, gsetStr, err := ParseMetaData(f.Name(), "mysql") - c.Assert(err, IsNil) - c.Assert(pos, DeepEquals, tc.pos) - c.Assert(gsetStr, Equals, tc.gsetStr) - } -} diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 6a4eb9d89e..ed83a25790 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -23,15 +23,15 @@ import ( "time" "github.com/pingcap/dm/dm/config" - binlog "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/dumpling" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/schema" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" @@ -733,14 +733,15 @@ func (cp *RemoteCheckPoint) LoadMeta() error { defer cp.Unlock() var ( - location *binlog.Location - err error + location *binlog.Location + safeModeExitLoc *binlog.Location + err error ) switch cp.cfg.Mode { case config.ModeAll: // NOTE: syncer must continue the syncing follow loader's tail, so we parse mydumper's output // refine when master / slave switching added and checkpoint mechanism refactored - location, err = cp.parseMetaData() + location, safeModeExitLoc, err = cp.parseMetaData() if err != nil { return err } @@ -773,6 +774,10 @@ func (cp *RemoteCheckPoint) LoadMeta() error { cp.globalPoint = newBinlogPoint(location.Clone(), location.Clone(), nil, nil, cp.cfg.EnableGTID) cp.logCtx.L().Info("loaded checkpoints from meta", log.WrapStringerField("global checkpoint", cp.globalPoint)) } + if safeModeExitLoc != nil { + cp.cfg.SafeModeExitLoc = safeModeExitLoc + cp.logCtx.L().Info("set SafeModeExitLoc from meta", zap.Stringer("SafeModeExitLoc", safeModeExitLoc)) + } return nil } @@ -805,22 +810,10 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl return sql2, args } -func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, error) { +func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, *binlog.Location, error) { // `metadata` is mydumper's output meta file name filename := path.Join(cp.cfg.Dir, "metadata") cp.logCtx.L().Info("parsing metadata from file", zap.String("file", filename)) - pos, gsetStr, err := utils.ParseMetaData(filename, cp.cfg.Flavor) - if err != nil { - return nil, err - } - - gset, err := gtid.ParserGTID(cp.cfg.Flavor, gsetStr) - if err != nil { - return nil, err - } - return &binlog.Location{ - Position: pos, - GTIDSet: gset, - }, nil + return dumpling.ParseMetaData(filename, cp.cfg.Flavor) } diff --git a/syncer/mode.go b/syncer/mode.go index 551061bd26..d3fd5e144c 100644 --- a/syncer/mode.go +++ b/syncer/mode.go @@ -26,12 +26,16 @@ import ( func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeMode *sm.SafeMode) { safeMode.Reset(tctx) // in initialization phase, reset first - safeMode.Add(tctx, 1) // try to enable + safeMode.Add(tctx, 1) // enable and will revert 5 minutes later if s.cfg.SafeMode { - safeMode.Add(tctx, 1) // add 1 but should no corresponding -1 + safeMode.Add(tctx, 1) // add 1 but should no corresponding -1, so keeps enabled s.tctx.L().Info("enable safe-mode by config") } + if s.cfg.SafeModeExitLoc != nil { + safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc + s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.cfg.SafeModeExitLoc)) + } go func() { defer func() { diff --git a/syncer/syncer.go b/syncer/syncer.go index ea9c2b2622..fbc6d188ee 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1094,7 +1094,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if s.cfg.Mode == config.ModeAll { if err = s.flushCheckPoints(); err != nil { s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) - } else { + } else if s.cfg.CleanDumpFile { s.tctx.L().Info("try to remove loaded files") metadataFile := path.Join(s.cfg.Dir, "metadata") if err = os.Remove(metadataFile); err != nil { @@ -1391,6 +1391,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { default: } + // check pass SafeModeExitLoc and try disable safe mode, but not in sharding or replacing error + if s.cfg.SafeModeExitLoc != nil && !s.isReplacingErr && shardingReSync == nil { + if binlog.CompareLocation(currentLocation, *s.cfg.SafeModeExitLoc, s.cfg.EnableGTID) >= 0 { + s.cfg.SafeModeExitLoc = nil + safeMode.Add(tctx, -1) + } + } + ec := eventContext{ tctx: tctx, header: e.Header, diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index a2f8b4e536..ef8c42c41f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -23,6 +23,8 @@ import ( "time" sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/failpoint" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/binlog" @@ -1138,16 +1140,7 @@ func (s *testSyncerSuite) TestRun(c *C) { c.Assert(err, IsNil) syncer.genRouter() - checkPointMock.ExpectBegin() - checkPointMock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.cfg.MetaSchema)).WillReturnResult(sqlmock.NewResult(1, 1)) - checkPointMock.ExpectCommit() - checkPointMock.ExpectBegin() - checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s`", s.cfg.MetaSchema, cputil.SyncerCheckpoint(s.cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) - checkPointMock.ExpectCommit() - - // mock syncer.checkpoint.Init() function - syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} - syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) + syncer.setupMockCheckpoint(checkPointDBConn, checkPointMock) syncer.reset() events1 := mockBinlogEvents{ @@ -1326,6 +1319,158 @@ func (s *testSyncerSuite) TestRun(c *C) { } } +func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + checkPointDB, checkPointMock, err := sqlmock.New() + checkPointDBConn, err := checkPointDB.Conn(context.Background()) + c.Assert(err, IsNil) + + testJobs.jobs = testJobs.jobs[:0] + + s.cfg.BAList = &filter.Rules{ + DoDBs: []string{"test_1"}, + DoTables: []*filter.Table{ + {Schema: "test_1", Name: "t_1"}, + }, + } + + syncer := NewSyncer(s.cfg, nil) + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} + syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, + {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) + + syncer.genRouter() + + syncer.setupMockCheckpoint(checkPointDBConn, checkPointMock) + + syncer.reset() + + events1 := mockBinlogEvents{ + mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"test_1", "create table test_1.t_1(id int primary key, name varchar(24))"}}, + + mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Update, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(2), "b"}, {int32(1), "b"}}}}, + } + + generatedEvents1 := s.generateEvents(events1, c) + // make sure [18] is last event, and use [18]'s position as safeModeExitLocation + c.Assert(len(generatedEvents1), Equals, 19) + safeModeExitLocation := binlog.NewLocation("") + safeModeExitLocation.Position.Pos = generatedEvents1[18].Header.LogPos + syncer.cfg.SafeModeExitLoc = &safeModeExitLocation + + // check after safeModeExitLocation, safe mode is turned off + events2 := mockBinlogEvents{ + mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(1), "a"}}}}, + mockBinlogEvent{typ: Update, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(2), "b"}, {int32(1), "b"}}}}, + } + generatedEvents2 := s.generateEvents(events2, c) + + generatedEvents := append(generatedEvents1, generatedEvents2...) + + mockStreamerProducer := &MockStreamProducer{generatedEvents} + mockStreamer, err := mockStreamerProducer.generateStreamer(binlog.NewLocation("")) + c.Assert(err, IsNil) + syncer.streamerController = &StreamerController{ + streamerProducer: mockStreamerProducer, + streamer: mockStreamer, + } + + syncer.addJobFunc = syncer.addJobToMemory + + ctx, cancel := context.WithCancel(context.Background()) + resultCh := make(chan pb.ProcessResult) + + // mock get parser + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + + // disable 5-minute safe mode + c.Assert(failpoint.Enable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil) + go syncer.Process(ctx, resultCh) + + expectJobs := []*expectJob{ + // now every ddl job will start with a flush job + { + flush, + "", + nil, + }, { + ddl, + "CREATE DATABASE IF NOT EXISTS `test_1`", + nil, + }, { + flush, + "", + nil, + }, { + ddl, + "CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", + nil, + }, { + insert, + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", + []interface{}{int32(1), "a"}, + }, { + del, + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", + []interface{}{int32(1)}, + }, { + update, + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", + []interface{}{int32(2)}, + }, { + update, + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", + []interface{}{int32(1), "b"}, + }, { + // start from this event, location passes safeModeExitLocation and safe mode should exit + insert, + "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", + []interface{}{int32(1), "a"}, + }, { + del, + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", + []interface{}{int32(1)}, + }, { + update, + "UPDATE `test_1`.`t_1` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", + []interface{}{int32(1), "b", int32(2)}, + }, + } + + executeSQLAndWait(len(expectJobs)) + c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(0)) + syncer.mockFinishJob(expectJobs) + + testJobs.Lock() + checkJobs(c, testJobs.jobs, expectJobs) + testJobs.jobs = testJobs.jobs[:0] + testJobs.Unlock() + + cancel() + syncer.Close() + c.Assert(syncer.isClosed(), IsTrue) + + if err := mock.ExpectationsWereMet(); err != nil { + c.Errorf("db unfulfilled expectations: %s", err) + } + + if err := checkPointMock.ExpectationsWereMet(); err != nil { + c.Errorf("checkpointDB unfulfilled expectations: %s", err) + } + c.Assert(failpoint.Disable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds"), IsNil) +} + func executeSQLAndWait(expectJobNum int) { for i := 0; i < 10; i++ { time.Sleep(time.Second) @@ -1417,3 +1562,16 @@ func (s *Syncer) addJobToMemory(job *job) error { return nil } + +func (s *Syncer) setupMockCheckpoint(checkPointDBConn *sql.Conn, checkPointMock sqlmock.Sqlmock) { + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", s.cfg.MetaSchema)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s`", s.cfg.MetaSchema, cputil.SyncerCheckpoint(s.cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + + // mock syncer.checkpoint.Init() function + s.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} + s.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) +} diff --git a/tests/safe_mode/run.sh b/tests/safe_mode/run.sh index e332c4db9c..68b3ba22ed 100755 --- a/tests/safe_mode/run.sh +++ b/tests/safe_mode/run.sh @@ -6,7 +6,59 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +function consistency_none() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i "/timezone/i\clean-dump-file: false" $WORK_DIR/dm-task.yaml + sed -i "s/extra-args: \"\"/extra-args: \"--consistency none\"/g" $WORK_DIR/dm-task.yaml + dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # make sure dumpling's metadata added empty line after two SHOW MASTER STATUS + empty_line=`grep -cvE '\S' $WORK_DIR/worker1/dumped_data.test/metadata` + if [ $empty_line -ne 2 ]; then + echo "wrong number of empty line in dumpling's metadata" + exit 1 + fi + empty_line=`grep -cvE '\S' $WORK_DIR/worker2/dumped_data.test/metadata` + if [ $empty_line -ne 2 ]; then + echo "wrong number of empty line in dumpling's metadata" + exit 1 + fi + + name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.test/metadata|tail -1|awk -F: '{print $2,":",$3}'|tr -d ' ') + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name1, $pos1), gtid-set: $gtid1\"\]" + name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.test/metadata|tail -1|awk -F: '{print $2}'|tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.test/metadata|tail -1|awk -F: '{print $2,":",$3}'|tr -d ' ') + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name2, $pos2), gtid-set: $gtid2\"\]" + + cleanup_data safe_mode_target + cleanup_process $* +} + function run() { + consistency_none + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2