Skip to content

Commit

Permalink
dump/: integrate dumpling (pingcap#540)
Browse files Browse the repository at this point in the history
Dump sql using dumpling. It's not easy to use for users because they need to download a mydumper binary file.
  • Loading branch information
lichunzhu authored Mar 18, 2020
1 parent e28625b commit 8014e51
Show file tree
Hide file tree
Showing 29 changed files with 665 additions and 172 deletions.
8 changes: 6 additions & 2 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ type MydumperConfig struct {
MydumperPath string `yaml:"mydumper-path" toml:"mydumper-path" json:"mydumper-path"` // mydumper binary path
Threads int `yaml:"threads" toml:"threads" json:"threads"` // -t, --threads
ChunkFilesize int64 `yaml:"chunk-filesize" toml:"chunk-filesize" json:"chunk-filesize"` // -F, --chunk-filesize
SkipTzUTC bool `yaml:"skip-tz-utc" toml:"skip-tz-utc" json:"skip-tz-utc"` // --skip-tz-utc
ExtraArgs string `yaml:"extra-args" toml:"extra-args" json:"extra-args"` // other extra args
StatementSize uint64 `yaml:"statement-size" toml:"statement-size" json:"statement-size"` // -S, --statement-size
Rows uint64 `yaml:"rows" toml:"rows" json:"rows"` // -r, --rows
Where string `yaml:"where" toml:"where" json:"where"` // --where

SkipTzUTC bool `yaml:"skip-tz-utc" toml:"skip-tz-utc" json:"skip-tz-utc"` // --skip-tz-utc
ExtraArgs string `yaml:"extra-args" toml:"extra-args" json:"extra-args"` // other extra args
// NOTE: use LoaderConfig.Dir as --outputdir
// TODO zxc: combine -B -T --regex with filter rules?
}
Expand Down
2 changes: 1 addition & 1 deletion dm/master/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/pd/pkg/tempurl"
"github.com/pingcap/pd/v4/pkg/tempurl"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/log"
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/pd/pkg/tempurl"
"github.com/pingcap/pd/v4/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"

Expand Down
6 changes: 3 additions & 3 deletions dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"net/http"
"net/http/pprof"

"github.com/pingcap/dm/pkg/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/mydumper"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/syncer"
Expand Down Expand Up @@ -61,7 +61,7 @@ func RegistryMetrics() {
registry.MustRegister(taskState)

relay.RegisterMetrics(registry)
mydumper.RegisterMetrics(registry)
dumpling.RegisterMetrics(registry)
loader.RegisterMetrics(registry)
syncer.RegisterMetrics(registry)
prometheus.DefaultGatherer = registry
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/pd/pkg/tempurl"
"github.com/pingcap/pd/v4/pkg/tempurl"
"github.com/siddontang/go-mysql/mysql"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/mydumper"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -43,18 +43,18 @@ var createUnits = createRealUnits
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{mydumper.NewMydumper(cfg)})
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
})

us := make([]unit.Unit, 0, 3)
switch cfg.Mode {
case config.ModeAll:
us = append(us, mydumper.NewMydumper(cfg))
us = append(us, dumpling.NewDumpling(cfg))
us = append(us, loader.NewLoader(cfg))
us = append(us, syncer.NewSyncer(cfg, etcdClient))
case config.ModeFull:
// NOTE: maybe need another checker in the future?
us = append(us, mydumper.NewMydumper(cfg))
us = append(us, dumpling.NewDumpling(cfg))
us = append(us, loader.NewLoader(cfg))
case config.ModeIncrement:
us = append(us, syncer.NewSyncer(cfg, etcdClient))
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/mydumper"
"github.com/pingcap/dm/syncer"

. "github.com/pingcap/check"
Expand All @@ -43,7 +43,7 @@ func (t *testSubTask) TestCreateUnits(c *C) {
cfg.Mode = config.ModeFull
unitsFull := createUnits(cfg, nil)
c.Assert(unitsFull, HasLen, 2)
_, ok := unitsFull[0].(*mydumper.Mydumper)
_, ok := unitsFull[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
_, ok = unitsFull[1].(*loader.Loader)
c.Assert(ok, IsTrue)
Expand All @@ -57,7 +57,7 @@ func (t *testSubTask) TestCreateUnits(c *C) {
cfg.Mode = config.ModeAll
unitsAll := createUnits(cfg, nil)
c.Assert(unitsAll, HasLen, 3)
_, ok = unitsAll[0].(*mydumper.Mydumper)
_, ok = unitsAll[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
_, ok = unitsAll[1].(*loader.Loader)
c.Assert(ok, IsTrue)
Expand Down
10 changes: 5 additions & 5 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/pd/pkg/tempurl"
"github.com/pingcap/pd/v4/pkg/tempurl"
"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -116,10 +116,10 @@ func (t *testServer) TestTaskAutoResume(c *C) {
NewRelayHolder = NewRealRelayHolder
}()

c.Assert(failpoint.Enable("github.com/pingcap/dm/mydumper/dumpUnitProcessForever", `return(true)`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/mydumper/dumpUnitProcessForever")
c.Assert(failpoint.Enable("github.com/pingcap/dm/mydumper/dumpUnitProcessWithError", `2*return("test auto resume inject error")`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/mydumper/dumpUnitProcessWithError")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever", `return(true)`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError", `2*return("test auto resume inject error")`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly", `return(true)`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly")
c.Assert(failpoint.Enable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr", `return()`), IsNil)
Expand Down
232 changes: 232 additions & 0 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dumpling

import (
"context"
"os"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/log"

"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"
)

// Dumpling dumps full data from a MySQL-compatible database
type Dumpling struct {
cfg *config.SubTaskConfig

logger log.Logger

dumpConfig *export.Config
closed sync2.AtomicBool
}

// NewDumpling creates a new Dumpling
func NewDumpling(cfg *config.SubTaskConfig) *Dumpling {
m := &Dumpling{
cfg: cfg,
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "dump")),
}
return m
}

// Init implements Unit.Init
func (m *Dumpling) Init(ctx context.Context) error {
var err error
m.dumpConfig, err = m.constructArgs()
return err
}

// Process implements Unit.Process
func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name).Add(0)

failpoint.Inject("dumpUnitProcessWithError", func(val failpoint.Value) {
m.logger.Info("dump unit runs with injected error", zap.String("failpoint", "dumpUnitProcessWithError"), zap.Reflect("error", val))
msg, ok := val.(string)
if !ok {
msg = "unknown process error"
}
pr <- pb.ProcessResult{
IsCanceled: false,
Errors: []*pb.ProcessError{unit.NewProcessError(errors.New(msg))},
}
failpoint.Return()
})

begin := time.Now()
errs := make([]*pb.ProcessError, 0, 1)

failpoint.Inject("dumpUnitProcessForever", func() {
m.logger.Info("dump unit runs forever", zap.String("failpoint", "dumpUnitProcessForever"))
<-ctx.Done()
failpoint.Return()
})

// NOTE: remove output dir before start dumping
// every time re-dump, loader should re-prepare
err := os.RemoveAll(m.cfg.Dir)
if err != nil {
m.logger.Error("fail to remove output directory", zap.String("directory", m.cfg.Dir), log.ShortError(err))
errs = append(errs, unit.NewProcessError(err))
pr <- pb.ProcessResult{
IsCanceled: false,
Errors: errs,
}
return
}

// TODO: dumpling can't be canceled now, we may add that in the future
err = export.Dump(m.dumpConfig)

if err != nil {
dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc()
errs = append(errs, unit.NewProcessError(err))
}

m.logger.Info("dump data finished", zap.Duration("cost time", time.Since(begin)))

pr <- pb.ProcessResult{
IsCanceled: false,
Errors: errs,
}
}

// Close implements Unit.Close
func (m *Dumpling) Close() {
if m.closed.Get() {
return
}
// do nothing, external will cancel the command (if running)
m.closed.Set(true)
}

// Pause implements Unit.Pause
func (m *Dumpling) Pause() {
if m.closed.Get() {
m.logger.Warn("try to pause, but already closed")
return
}
// do nothing, external will cancel the command (if running)
}

// Resume implements Unit.Resume
func (m *Dumpling) Resume(ctx context.Context, pr chan pb.ProcessResult) {
if m.closed.Get() {
m.logger.Warn("try to resume, but already closed")
return
}
// just call Process
m.Process(ctx, pr)
}

// Update implements Unit.Update
func (m *Dumpling) Update(cfg *config.SubTaskConfig) error {
// not support update configuration now
return nil
}

// Status implements Unit.Status
func (m *Dumpling) Status() interface{} {
// NOTE: try to add some status, like dumped file count
return &pb.DumpStatus{}
}

// Error implements Unit.Error
func (m *Dumpling) Error() interface{} {
return &pb.DumpError{}
}

// Type implements Unit.Type
func (m *Dumpling) Type() pb.UnitType {
return pb.UnitType_Dump
}

// IsFreshTask implements Unit.IsFreshTask
func (m *Dumpling) IsFreshTask(ctx context.Context) (bool, error) {
return true, nil
}

// constructArgs constructs arguments for exec.Command
func (m *Dumpling) constructArgs() (*export.Config, error) {
cfg := m.cfg
db := cfg.From

dumpConfig := export.DefaultConfig()
// ret is used to record the unsupported arguments for dumpling
var ret []string
extraArgs := strings.Fields(cfg.ExtraArgs)
if len(extraArgs) > 0 {
err := parseExtraArgs(dumpConfig, ParseArgLikeBash(extraArgs))
if err != nil {
m.logger.Warn("parsed some unsupported arguments", zap.Error(err))
ret = append(ret, extraArgs...)
}
}
// block status addr because we already have it in DM, and if we enable it, may we need more ports for the process.
dumpConfig.StatusAddr = ""

dumpConfig.Host = db.Host
dumpConfig.Port = db.Port
dumpConfig.User = db.User
dumpConfig.Password = db.Password
dumpConfig.OutputDirPath = cfg.Dir // use LoaderConfig.Dir as output dir
dumpConfig.BlackWhiteList = export.BWListConf{
Mode: export.MySQLReplicationMode,
Rules: &export.MySQLReplicationConf{
Rules: cfg.BWList,
CaseSensitive: cfg.CaseSensitive,
},
}
dumpConfig.EscapeBackslash = true
dumpConfig.Logger = m.logger.Logger

if cfg.Threads > 0 {
dumpConfig.Threads = cfg.Threads
}
if cfg.ChunkFilesize > 0 {
dumpConfig.FileSize = uint64(cfg.ChunkFilesize)
}
if cfg.StatementSize > 0 {
dumpConfig.StatementSize = cfg.StatementSize
}
if cfg.Rows > 0 {
dumpConfig.Rows = cfg.Rows
}
if len(cfg.Where) > 0 {
dumpConfig.Where = cfg.Where
}

if cfg.SkipTzUTC {
// TODO: support skip-tz-utc
ret = append(ret, "--skip-tz-utc")
}

// TODO: support String for export.Config
m.logger.Info("create dumpling", zap.Reflect("config", dumpConfig))
if len(ret) > 0 {
m.logger.Warn("meeting some unsupported arguments", zap.Strings("argument", ret))
}

return dumpConfig, nil
}
Loading

0 comments on commit 8014e51

Please sign in to comment.