Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1272 to release-2.0 (#1316)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Dec 21, 2020
1 parent ef88545 commit 8d8ffc1
Show file tree
Hide file tree
Showing 26 changed files with 1,170 additions and 388 deletions.
6 changes: 5 additions & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ ErrFailUpdateV1DBSchema,[code=11116:class=functional:scope=internal:level=medium
ErrBinlogStatusVarsParse,[code=11117:class=functional:scope=internal:level=medium], "Message: fail to parse binglog status_vars: %v, offset: %d"
ErrVerifyHandleErrorArgs,[code=11118:class=functional:scope=internal:level=low], "Workaround: Please make sure the args are correct."
ErrRewriteSQL,[code=11119:class=functional:scope=internal:level=high], "Message: failed to rewrite SQL for target DB, stmt: %+v, targetTableNames: %+v"
ErrNoUUIDDirMatchGTID,[code=11120:class=functional:scope=internal:level=high], "Message: no relay subdir match gtid %s"
ErrNoRelayPosMatchGTID,[code=11121:class=functional:scope=internal:level=high], "Message: no relay pos match gtid %s"
ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low]
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`."
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format."
ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format."
Expand Down Expand Up @@ -428,12 +431,13 @@ ErrWorkerCacheDDLInfoExists,[code=40063:class=dm-worker:scope=internal:level=hig
ErrWorkerExecSkipDDLConflict,[code=40064:class=dm-worker:scope=internal:level=high], "Message: execDDL and skipDDL can not specify both at the same time"
ErrWorkerExecDDLSyncerOnly,[code=40065:class=dm-worker:scope=internal:level=high], "Message: only syncer support ExecuteDDL, but current unit is %s"
ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high], "Message: ExecuteDDL timeout (exceeding %s), Workaround: Please try use `query-status` to query whether the DDL is still blocking."
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high], "Message: waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high], "Message: waiting for relay to catch up with loader is timeout (exceeding %s), loader: %s, relay: %s"
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high], "Message: relay log purger is purging, cannot start sub task %s, Workaround: Please try again later."
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high], "Message: host:port '%s' not valid, Workaround: Please check configs in worker configuration file."
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: no mysql source is being handled in the worker"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already started"
ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker"
ErrWorkerWaitRelayCatchupGTID,[code=40078:class=dm-worker:scope=internal:level=high], "Message: cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s"
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd"
ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd"
ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high], "Message: missing shard DDL lock operation for shard DDL info (%s)"
Expand Down
301 changes: 176 additions & 125 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ message LoadStatus {
int64 totalBytes = 2;
string progress = 3;
string metaBinlog = 4;
string metaBinlogGTID = 5;
}

// ShardingGroup represents a DDL sharding group, this is used by SyncStatus, and is differ from ShardingGroup in syncer pkg
Expand Down
52 changes: 6 additions & 46 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/relay/purger"
rr "github.com/pingcap/dm/relay/retry"
)

// RelayHolder for relay unit
Expand Down Expand Up @@ -77,36 +76,13 @@ type realRelayHolder struct {
}

// NewRealRelayHolder creates a new RelayHolder
func NewRealRelayHolder(cfg *config.SourceConfig) RelayHolder {
clone := cfg.DecryptPassword()
relayCfg := &relay.Config{
EnableGTID: clone.EnableGTID,
AutoFixGTID: clone.AutoFixGTID,
Flavor: clone.Flavor,
RelayDir: clone.RelayDir,
ServerID: clone.ServerID,
Charset: clone.Charset,
From: relay.DBConfig{
Host: clone.From.Host,
Port: clone.From.Port,
User: clone.From.User,
Password: clone.From.Password,
},
BinLogName: clone.RelayBinLogName,
BinlogGTID: clone.RelayBinlogGTID,
ReaderRetry: rr.ReaderRetryConfig{ // we use config from TaskChecker now
BackoffRollback: cfg.Checker.BackoffRollback.Duration,
BackoffMax: cfg.Checker.BackoffMax.Duration,
BackoffMin: cfg.Checker.BackoffMin.Duration,
BackoffJitter: cfg.Checker.BackoffJitter,
BackoffFactor: cfg.Checker.BackoffFactor,
},
}
func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder {
cfg := relay.FromSourceCfg(sourceCfg)

h := &realRelayHolder{
cfg: cfg,
cfg: sourceCfg,
stage: pb.Stage_New,
relay: relay.NewRelay(relayCfg),
relay: relay.NewRelay(cfg),
l: log.With(zap.String("component", "relay holder")),
}
h.closed.Set(closedTrue)
Expand Down Expand Up @@ -308,24 +284,8 @@ func (h *realRelayHolder) Result() *pb.ProcessResult {
}

// Update update relay config online
func (h *realRelayHolder) Update(ctx context.Context, cfg *config.SourceConfig) error {
relayCfg := &relay.Config{
AutoFixGTID: cfg.AutoFixGTID,
Charset: cfg.Charset,
From: relay.DBConfig{
Host: cfg.From.Host,
Port: cfg.From.Port,
User: cfg.From.User,
Password: cfg.From.Password,
},
ReaderRetry: rr.ReaderRetryConfig{ // we use config from TaskChecker now
BackoffRollback: cfg.Checker.BackoffRollback.Duration,
BackoffMax: cfg.Checker.BackoffMax.Duration,
BackoffMin: cfg.Checker.BackoffMin.Duration,
BackoffJitter: cfg.Checker.BackoffJitter,
BackoffFactor: cfg.Checker.BackoffFactor,
},
}
func (h *realRelayHolder) Update(ctx context.Context, sourceCfg *config.SourceConfig) error {
relayCfg := relay.FromSourceCfg(sourceCfg)

stage := h.Stage()

Expand Down
7 changes: 7 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/gtid"
pkgstreamer "github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
Expand Down Expand Up @@ -114,6 +116,11 @@ func (d *DummyRelay) Close() {}
// IsClosed implements Process interface
func (d *DummyRelay) IsClosed() bool { return false }

// SaveMeta implements Process interface
func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error {
return nil
}

func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = NewDummyRelay
Expand Down
56 changes: 46 additions & 10 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/sync2"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -582,6 +585,13 @@ func (st *SubTask) ShardDDLOperation() *pessimism.Operation {
// Currently there is only one wait condition
// from Load unit to Sync unit, wait for relay-log catched up with mydumper binlog position.
func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error {
var (
gset1 gtid.Set
gset2 gtid.Set
pos1 *mysql.Position
pos2 *mysql.Position
err error
)
pu := st.PrevUnit()
cu := st.CurrUnit()
if pu != nil && pu.Type() == pb.UnitType_Load && cu.Type() == pb.UnitType_Sync {
Expand All @@ -599,26 +609,52 @@ func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error {
loadStatus := pu.Status(ctxStatus).(*pb.LoadStatus)
cancelStatus()

pos1, err := utils.DecodeBinlogPosition(loadStatus.MetaBinlog)
if err != nil {
return terror.WithClass(err, terror.ClassDMWorker)
if st.cfg.EnableGTID {
gset1, err = gtid.ParserGTID(st.cfg.Flavor, loadStatus.MetaBinlogGTID)
if err != nil {
return terror.WithClass(err, terror.ClassDMWorker)
}
} else {
pos1, err = utils.DecodeBinlogPosition(loadStatus.MetaBinlog)
if err != nil {
return terror.WithClass(err, terror.ClassDMWorker)
}
}

for {
ctxStatus, cancelStatus = context.WithTimeout(ctxWait, utils.DefaultDBTimeout)
relayStatus := hub.w.relayHolder.Status(ctxStatus)
cancelStatus()

pos2, err := utils.DecodeBinlogPosition(relayStatus.RelayBinlog)
if err != nil {
return terror.WithClass(err, terror.ClassDMWorker)
}
if pos1.Compare(*pos2) <= 0 {
break
if st.cfg.EnableGTID {
gset2, err = gtid.ParserGTID(st.cfg.Flavor, relayStatus.RelayBinlogGtid)
if err != nil {
return terror.WithClass(err, terror.ClassDMWorker)
}
rc, ok := binlog.CompareGTID(gset1, gset2)
if !ok {
return terror.ErrWorkerWaitRelayCatchupGTID.Generate(loadStatus.MetaBinlogGTID, relayStatus.RelayBinlogGtid)
}
if rc <= 0 {
break
}
} else {
pos2, err = utils.DecodeBinlogPosition(relayStatus.RelayBinlog)
if err != nil {
return terror.WithClass(err, terror.ClassDMWorker)
}
if pos1.Compare(*pos2) <= 0 {
break
}
}
st.l.Debug("wait relay to catchup", zap.Stringer("load end position", pos1), zap.Stringer("relay position", pos2))

st.l.Debug("wait relay to catchup", zap.Bool("enableGTID", st.cfg.EnableGTID), zap.Stringer("load end position", pos1), zap.String("load end gtid", loadStatus.MetaBinlogGTID), zap.Stringer("relay position", pos2), zap.String("relay gtid", relayStatus.RelayBinlogGtid))

select {
case <-ctxWait.Done():
if st.cfg.EnableGTID {
return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(waitRelayCatchupTimeout, loadStatus.MetaBinlogGTID, relayStatus.RelayBinlogGtid)
}
return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(waitRelayCatchupTimeout, pos1, pos2)
case <-subTaskCtx.Done():
return nil
Expand Down
26 changes: 25 additions & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,24 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-functional-11120]
message = "no relay subdir match gtid %s"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-functional-11121]
message = "no relay pos match gtid %s"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-functional-11122]
message = ""
description = ""
workaround = ""
tags = ["internal", "low"]

[error.DM-config-20001]
message = "checking item %s is not supported\n%s"
description = ""
Expand Down Expand Up @@ -2579,7 +2597,7 @@ workaround = "Please try use `query-status` to query whether the DDL is still bl
tags = ["internal", "high"]

[error.DM-dm-worker-40067]
message = "waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s"
message = "waiting for relay to catch up with loader is timeout (exceeding %s), loader: %s, relay: %s"
description = ""
workaround = ""
tags = ["internal", "high"]
Expand Down Expand Up @@ -2644,6 +2662,12 @@ description = ""
workaround = "Please check network connection of worker"
tags = ["internal", "high"]

[error.DM-dm-worker-40078]
message = "cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-tracer-42001]
message = "parse dm-tracer config flag set"
description = ""
Expand Down
2 changes: 2 additions & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ type Loader struct {
totalFileCount sync2.AtomicInt64 // schema + table + data
finishedDataSize sync2.AtomicInt64
metaBinlog sync2.AtomicString
metaBinlogGTID sync2.AtomicString

// record process error rather than log.Fatal
runFatalChan chan *pb.ProcessError
Expand Down Expand Up @@ -1315,6 +1316,7 @@ func (l *Loader) getMydumpMetadata() error {
}

l.metaBinlog.Set(loc.Position.String())
l.metaBinlogGTID.Set(loc.GTIDSetStr())
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ func (l *Loader) Status(ctx context.Context) interface{} {
totalSize := l.totalDataSize.Get()
progress := percent(finishedSize, totalSize, l.finish.Get())
s := &pb.LoadStatus{
FinishedBytes: finishedSize,
TotalBytes: totalSize,
Progress: progress,
MetaBinlog: l.metaBinlog.Get(),
FinishedBytes: finishedSize,
TotalBytes: totalSize,
Progress: progress,
MetaBinlog: l.metaBinlog.Get(),
MetaBinlogGTID: l.metaBinlogGTID.Get(),
}
return s
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa
// a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56
// b. https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
if gSet == nil || len(gSet.String()) == 0 {
if gSet == nil {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/binlog/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type FileReader struct {
parser *replication.BinlogParser
ch chan *replication.BinlogEvent
ech chan error
endCh chan struct{}

logger log.Logger

Expand Down Expand Up @@ -89,6 +90,7 @@ func NewFileReader(cfg *FileReaderConfig) Reader {
parser: parser,
ch: make(chan *replication.BinlogEvent, cfg.ChBufferSize),
ech: make(chan error, cfg.EchBufferSize),
endCh: make(chan struct{}),
logger: log.With(zap.String("component", "binlog file reader")),
}
}
Expand All @@ -115,6 +117,9 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error {
case r.ech <- err:
case <-r.ctx.Done():
}
} else {
r.logger.Info("parse end of binlog file", zap.Stringer("pos", pos))
close(r.endCh)
}
}()

Expand Down Expand Up @@ -159,6 +164,8 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er
return ev, nil
case err := <-r.ech:
return nil, err
case <-r.endCh:
return nil, terror.ErrReaderReachEndOfFile.Generate()
case <-ctx.Done():
return nil, ctx.Err()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/binlog/reader/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
)

var (
Expand Down Expand Up @@ -138,7 +139,7 @@ func (t *testFileReaderSuite) TestGetEvent(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
e, err = r.GetEvent(ctx)
c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded)
c.Assert(terror.ErrReaderReachEndOfFile.Equal(err), IsTrue)
c.Assert(e, IsNil)
c.Assert(r.Close(), IsNil) // close the reader

Expand Down
Loading

0 comments on commit 8d8ffc1

Please sign in to comment.