Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: use dumpling's finish building connection location to leave safe m…
Browse files Browse the repository at this point in the history
…ode (#915) (#981)
  • Loading branch information
lance6716 authored Sep 3, 2020
1 parent a5574b6 commit 17b2fbc
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 273 deletions.
3 changes: 3 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand Down Expand Up @@ -239,6 +240,8 @@ type SyncerConfig struct {
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
DisableCausality bool `yaml:"disable-detect" toml:"disable-detect" json:"disable-detect"`
SafeMode bool `yaml:"safe-mode" toml:"safe-mode" json:"safe-mode"`
// when dump unit can't run consistent dump, enable safe mode until pass exit location of dumping
SafeModeExitLoc *binlog.Location `yaml:"-" toml:"-" json:"-"`
// deprecated, use `ansi-quotes` in top level config instead
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}
Expand Down
5 changes: 5 additions & 0 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ func (m *Dumpling) constructArgs() (*export.Config, error) {
}
}

// record exit position when consistency is none, to support scenarios like Aurora upstream
if dumpConfig.Consistency == "none" {
dumpConfig.PosAfterConnect = true
}

m.logger.Info("create dumpling", zap.Stringer("config", dumpConfig))
if len(ret) > 0 {
m.logger.Warn("meeting some unsupported arguments", zap.Strings("argument", ret))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d
github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d h1:g98WMoYbEf6dCFJ//l4XyyJw4cA8fICfP3F0Q6BF4EM=
github.com/pingcap/dumpling v0.0.0-20200825093428-8e00b826f68d/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY=
github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b h1:9z6SWg93iqdKirDr2vhXgD5wh7JjN1SlOEIiKTOtx3I=
github.com/pingcap/dumpling v0.0.0-20200829142316-42ac4de0db6b/go.mod h1:1Su9KgYl5/KKDfvWTPtPpT6yoD/7jSZ7whsf0U7XxyY=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg=
github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
Expand Down
5 changes: 3 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/dumpling"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -1239,13 +1240,13 @@ func (l *Loader) checkpointID() string {

func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
pos, _, err := utils.ParseMetaData(metafile, l.cfg.Flavor)
loc, _, err := dumpling.ParseMetaData(metafile, l.cfg.Flavor)
if err != nil {
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
}

l.metaBinlog.Set(pos.String())
l.metaBinlog.Set(loc.Position.String())
return nil
}

Expand Down
184 changes: 184 additions & 0 deletions pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dumpling

import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"

"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
)

// ParseMetaData parses mydumper's output meta file and returns binlog location.
// since v2.0.0, dumpling maybe configured to output master status after connection pool is established,
// we return this location as well.
func ParseMetaData(filename, flavor string) (*binlog.Location, *binlog.Location, error) {
fd, err := os.Open(filename)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err)
}
defer fd.Close()

var (
pos mysql.Position
gtidStr string
useLocation2 = false
pos2 mysql.Position
gtidStr2 string

loc *binlog.Location
loc2 *binlog.Location
)

br := bufio.NewReader(fd)

parsePosAndGTID := func(pos *mysql.Position, gtid *string) error {
for {
line, err2 := br.ReadString('\n')
if err2 != nil {
return err2
}
parts := strings.SplitN(line, ":", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
switch key {
case "Log":
pos.Name = value
case "Pos":
pos64, err3 := strconv.ParseUint(value, 10, 32)
if err3 != nil {
return err3
}
pos.Pos = uint32(pos64)
case "GTID":
// multiple GTID sets may cross multiple lines, continue to read them.
following, err3 := readFollowingGTIDs(br, flavor)
if err3 != nil {
return err3
}
*gtid = value + following
return nil
}
}
}

for {
line, err2 := br.ReadString('\n')
if err2 == io.EOF {
break
} else if err2 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err2)
}
line = strings.TrimSpace(line)
if len(line) == 0 {
continue
}

switch line {
case "SHOW MASTER STATUS:":
if err3 := parsePosAndGTID(&pos, &gtidStr); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
}
case "SHOW SLAVE STATUS:":
// ref: https://github.com/maxbube/mydumper/blob/master/mydumper.c#L434
for {
line, err3 := br.ReadString('\n')
if err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
}
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
}
case "SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */":
useLocation2 = true
if err3 := parsePosAndGTID(&pos2, &gtidStr2); err3 != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(err3)
}
default:
// do nothing for Started dump, Finished dump...
}
}

if len(pos.Name) == 0 || pos.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}

gset, err := gtid.ParserGTID(flavor, gtidStr)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}
loc = &binlog.Location{
Position: pos,
GTIDSet: gset,
}

if useLocation2 {
if len(pos2.Name) == 0 || pos2.Pos == uint32(0) {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}
gset2, err := gtid.ParserGTID(flavor, gtidStr2)
if err != nil {
return nil, nil, terror.ErrParseMydumperMeta.Generate(fmt.Sprintf("file %s invalid format", filename))
}
loc2 = &binlog.Location{
Position: pos2,
GTIDSet: gset2,
}
}

return loc, loc2, nil
}

func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) {
var following strings.Builder
for {
line, err := br.ReadString('\n')
if err == io.EOF {
return following.String(), nil // return the previous, not including the last line.
} else if err != nil {
return "", err
}

line = strings.TrimSpace(line)
if len(line) == 0 {
return following.String(), nil // end with empty line.
}

end := len(line)
if strings.HasSuffix(line, ",") {
end = len(line) - 1
}

// try parse to verify it
_, err = gtid.ParserGTID(flavor, line[:end])
if err != nil {
return following.String(), nil // return the previous, not including this non-GTID line.
}

following.WriteString(line)
}
}
Loading

0 comments on commit 17b2fbc

Please sign in to comment.