Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: use dumpling's finish building connection location to leave safe mode #915

Merged
merged 17 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand Down Expand Up @@ -239,6 +240,8 @@ type SyncerConfig struct {
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping
SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"`
// deprecated, use `ansi-quotes` in top level config instead
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}
Expand Down
5 changes: 5 additions & 0 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ func (m *Dumpling) constructArgs() (*export.Config, error) {
}
}

// record exit position when consistency is none, to support scenarios like Aurora upstream
if dumpConfig.Consistency == "none" {
dumpConfig.PosAfterConnect = true
}

m.logger.Info("create dumpling", zap.Stringer("config", dumpConfig))
if len(ret) > 0 {
m.logger.Warn("meeting some unsupported arguments", zap.Strings("argument", ret))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d
github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d h1:g98WMoYbEf6dCFJ//l4XyyJw4cA8fICfP3F0Q6BF4EM=
github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY=
github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b h1:9z6SWg93iqdKirDr2vhXgD5wh7JjN1SlOEIiKTOtx3I=
github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg=
github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
Expand Down
5 changes: 3 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/dumpling"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -1239,13 +1240,13 @@ func (l *Loader) checkpointID() string {

func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
pos, _, err := utils.ParseMetaData(metafile, l.cfg.Flavor)
loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor)
if err != nil {
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
}

l.metaBinlog.Set(pos.String())
l.metaBinlog.Set(loc.Position.String())
return nil
}

Expand Down
184 changes: 184 additions & 0 deletions pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2019 PingCAP, Inc.
Copy link
Collaborator Author

@lance6716 lance6716 Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed from pkg/utils/mydumper.go to avoid importing cycle

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dumpling

import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"

"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
)

// ParseMetaData parses mydumper's output meta file and returns binlog location.
// since v2.0.0, dumpling maybe configured to output master status after connection pool is established,
// we return this location as well.
func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) {
fd, err := os.Open(filename)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err)
}
defer fd.Close()

var (
pos mysql.Position
gtidStr string
useLocation2 = false
pos2 mysql.Position
gtidStr2 string

loc *binlog.Location
loc2 *binlog.Location
)

br := bufio.NewReader(fd)

parsePosAndGTID := func(pos *mysql.Position, gtid *string) error {
for {
line, err2 := br.ReadString('\n')
if err2 != nil {
return err2
}
parts := strings.SplitN(line, ":", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
switch key {
case "Log":
pos.Name = value
case "Pos":
pos64, err3 := strconv.ParseUint(value, 10, 32)
if err3 != nil {
return err3
}
pos.Pos = uint32(pos64)
case "GTID":
// multiple GTID sets may cross multiple lines, continue to read them.
following, err3 := readFollowingGTIDs(br, flavor)
if err3 != nil {
return err3
}
*gtid = value + following
return nil
}
}
}

for {
line, err2 := br.ReadString('\n')
if err2 == io.EOF {
break
} else if err2 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err2)
}
line = strings.TrimSpace(line)
if len(line) == 0 {
continue
}

switch line {
case "SHOW MASTER STATUS:":
if err3 := parsePosAndGTID(&pos, &gtidStr); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
}
case "SHOW SLAVE STATUS:":
// ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434
for {
line, err3 := br.ReadString('\n')
if err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
}
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
}
case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */":
useLocation2 = true
if err3 := parsePosAndGTID(&pos2, &gtidStr2); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
}
default:
// do nothing for Started dump, Finished dump...
}
}

if len(pos.Name) == 0 || pos.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}

gset, err := gtid.ParserGTID(flavor, gtidStr)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}
loc = &binlog.Location{
Position: pos,
GTIDSet: gset,
}

if useLocation2 {
if len(pos2.Name) == 0 || pos2.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}
gset2, err := gtid.ParserGTID(flavor, gtidStr2)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}
loc2 = &binlog.Location{
Position: pos2,
GTIDSet: gset2,
}
}

return loc, loc2, nil
}

func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) {
var following strings.Builder
for {
line, err := br.ReadString('\n')
if err == io.EOF {
return following.String(), nil // return the previous, not including the last line.
} else if err != nil {
return "", err
}

line = strings.TrimSpace(line)
if len(line) == 0 {
return following.String(), nil // end with empty line.
}

end := len(line)
if strings.HasSuffix(line, ",") {
end = len(line) - 1
}

// try parse to verify it
_, err = gtid.ParserGTID(flavor, line[:end])
if err != nil {
return following.String(), nil // return the previous, not including this non-GTID line.
}

following.WriteString(line)
}
}
Loading