Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1508 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
lance6716 authored and ti-srebot committed Mar 23, 2021
1 parent 659a125 commit 97a547e
Show file tree
Hide file tree
Showing 25 changed files with 572 additions and 240 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:leve
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high], "Message: relay log purger is purging, cannot start sub task %s, Workaround: Please try again later."
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high], "Message: host:port '%s' not valid, Workaround: Please check configs in worker configuration file."
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: no mysql source is being handled in the worker"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already started"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source worker %s has already started with source %s, but get a request with source %s, Workaround: Please try restart this DM-worker"
ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker"
ErrWorkerWaitRelayCatchupGTID,[code=40078:class=dm-worker:scope=internal:level=high], "Message: cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s"
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd"
Expand Down
2 changes: 1 addition & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type SubTaskConfig struct {
// RelayDir get value from dm-worker config
RelayDir string `toml:"relay-dir" json:"relay-dir"`

// UseRelay get value from dm-worker config
// UseRelay get value from dm-worker's relayEnabled
UseRelay bool `toml:"use-relay" json:"use-relay"`
From DBConfig `toml:"from" json:"from"`
To DBConfig `toml:"to" json:"to"`
Expand Down
42 changes: 26 additions & 16 deletions dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ func (s *Server) bootstrap(ctx context.Context) error {
if err != nil {
return terror.ErrMasterFailToImportFromV10x.Delegate(err)
}
} else {
uctx := upgrade.Context{
Context: ctx,
SubTaskConfigs: s.scheduler.GetSubTaskCfgs(),
}
err := upgrade.TryUpgrade(s.etcdClient, uctx)
if err != nil {
return err
}
}

uctx := upgrade.Context{
Context: ctx,
SubTaskConfigs: s.scheduler.GetSubTaskCfgs(),
}
err := upgrade.TryUpgrade(s.etcdClient, uctx)

if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -104,16 +104,26 @@ func (s *Server) importFromV10x(ctx context.Context) error {
return err
}

// 5. create sources.
logger.Info("add source config into cluster")
err = s.addSourcesV1Import(tctx, sourceCfgs)
// 5. upgrade v1.0.x downstream metadata table and run v2.0 upgrading routines.
// some v2.0 upgrading routines are also altering schema, if we run them after adding sources, DM worker will
// meet error.
logger.Info("upgrading downstream metadata tables")
err = s.upgradeDBSchemaV1Import(tctx, subtaskCfgs)
if err != nil {
return err
}
uctx := upgrade.Context{
Context: ctx,
SubTaskConfigs: subtaskCfgs,
}
err = upgrade.UntouchVersionUpgrade(s.etcdClient, uctx)
if err != nil {
return err
}

// 6. upgrade v1.0.x downstream metadata table.
logger.Info("upgrading downstream metadata tables")
err = s.upgradeDBSchemaV1Import(tctx, subtaskCfgs)
// 6. create sources.
logger.Info("add source config into cluster")
err = s.addSourcesV1Import(tctx, sourceCfgs)
if err != nil {
return err
}
Expand All @@ -127,7 +137,7 @@ func (s *Server) importFromV10x(ctx context.Context) error {

// 8. mark the upgrade operation as done.
logger.Info("marking upgrade from v1.0.x as done")
_, err = upgrade.PutVersion(s.etcdClient, upgrade.MinVersion)
_, err = upgrade.PutVersion(s.etcdClient, upgrade.CurrentVersion)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package worker
import (
"context"
"strings"
"sync"
"time"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -119,9 +120,13 @@ func (s *Server) KeepAlive() {
}
}

// TODO: a channel is enough to avoid data race, check TTL not changed at receiving end of channel
var keepAliveLock sync.Mutex

// UpdateKeepAliveTTL updates keepalive key with new lease TTL in place, to avoid watcher observe a DELETE event
// this function should not be concurrently called
func (s *Server) UpdateKeepAliveTTL(newTTL int64) {
keepAliveLock.Lock()
defer keepAliveLock.Unlock()
if ha.CurrentKeepAliveTTL == newTTL {
log.L().Info("not changing keepalive TTL, skip", zap.Int64("ttl", newTTL))
return
Expand Down
22 changes: 9 additions & 13 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -34,7 +33,7 @@ import (
// RelayHolder for relay unit
type RelayHolder interface {
// Init initializes the holder
Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error)
Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error)
// Start starts run the relay
Start()
// Close closes the holder
Expand Down Expand Up @@ -70,7 +69,7 @@ type realRelayHolder struct {

l log.Logger

closed sync2.AtomicInt32
closed sync2.AtomicBool
stage pb.Stage
result *pb.ProcessResult // the process result, nil when is processing
}
Expand All @@ -85,23 +84,20 @@ func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder {
relay: relay.NewRelay(cfg),
l: log.With(zap.String("component", "relay holder")),
}
h.closed.Set(closedTrue)
h.closed.Set(true)
return h
}

// Init initializes the holder
func (h *realRelayHolder) Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
h.closed.Set(closedFalse)
func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
h.closed.Set(false)

// initial relay purger
operators := []purger.RelayOperator{
h,
streamer.GetReaderHub(),
}

// TODO: refine the context usage of relay, and it may need to be initialized before handle any subtasks.
ctx, cancel := context.WithTimeout(context.Background(), unit.DefaultInitTimeout)
defer cancel()
if err := h.relay.Init(ctx); err != nil {
return nil, terror.Annotate(err, "initial relay unit")
}
Expand All @@ -120,7 +116,7 @@ func (h *realRelayHolder) Start() {

// Close closes the holder
func (h *realRelayHolder) Close() {
if !h.closed.CompareAndSwap(closedFalse, closedTrue) {
if !h.closed.CompareAndSwap(false, true) {
return
}

Expand Down Expand Up @@ -153,7 +149,7 @@ func (h *realRelayHolder) run() {

// Status returns relay unit's status
func (h *realRelayHolder) Status(ctx context.Context) *pb.RelayStatus {
if h.closed.Get() == closedTrue || h.relay.IsClosed() {
if h.closed.Get() || h.relay.IsClosed() {
return &pb.RelayStatus{
Stage: pb.Stage_Stopped,
}
Expand All @@ -168,7 +164,7 @@ func (h *realRelayHolder) Status(ctx context.Context) *pb.RelayStatus {

// Error returns relay unit's status
func (h *realRelayHolder) Error() *pb.RelayError {
if h.closed.Get() == closedTrue || h.relay.IsClosed() {
if h.closed.Get() || h.relay.IsClosed() {
return &pb.RelayError{
Msg: "relay stopped",
}
Expand Down Expand Up @@ -356,7 +352,7 @@ func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder {
}

// Init implements interface of RelayHolder
func (d *dummyRelayHolder) Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
// initial relay purger
operators := []purger.RelayOperator{
d,
Expand Down
25 changes: 13 additions & 12 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func (t *testRelay) TestRelay(c *C) {
}

func (t *testRelay) testInit(c *C, holder *realRelayHolder) {
_, err := holder.Init(nil)
ctx := context.Background()
_, err := holder.Init(ctx, nil)
c.Assert(err, IsNil)

r, ok := holder.relay.(*DummyRelay)
Expand All @@ -169,19 +170,19 @@ func (t *testRelay) testInit(c *C, holder *realRelayHolder) {
r.InjectInitError(initErr)
defer r.InjectInitError(nil)

_, err = holder.Init(nil)
_, err = holder.Init(ctx, nil)
c.Assert(err, ErrorMatches, ".*"+initErr.Error()+".*")
}

func (t *testRelay) testStart(c *C, holder *realRelayHolder) {
c.Assert(holder.Stage(), Equals, pb.Stage_New)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)
c.Assert(holder.Result(), IsNil)

holder.Start()
c.Assert(waitRelayStage(holder, pb.Stage_Running, 10), IsTrue)
c.Assert(holder.Result(), IsNil)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)

// test status
status := holder.Status(context.Background())
Expand All @@ -193,13 +194,13 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) {
// test update and pause -> resume
t.testUpdate(c, holder)
c.Assert(holder.Stage(), Equals, pb.Stage_Paused)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)

err := holder.Operate(context.Background(), pb.RelayOp_ResumeRelay)
c.Assert(err, IsNil)
c.Assert(waitRelayStage(holder, pb.Stage_Running, 10), IsTrue)
c.Assert(holder.Result(), IsNil)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)
}

func (t *testRelay) testClose(c *C, holder *realRelayHolder) {
Expand All @@ -217,12 +218,12 @@ func (t *testRelay) testClose(c *C, holder *realRelayHolder) {
holder.Close()
c.Assert(waitRelayStage(holder, pb.Stage_Paused, 10), IsTrue)
c.Assert(holder.Result(), DeepEquals, processResult)
c.Assert(holder.closed.Get(), Equals, closedTrue)
c.Assert(holder.closed.Get(), IsTrue)

holder.Close()
c.Assert(holder.Stage(), Equals, pb.Stage_Paused)
c.Assert(holder.Result(), DeepEquals, processResult)
c.Assert(holder.closed.Get(), Equals, closedTrue)
c.Assert(holder.closed.Get(), IsTrue)

// todo: very strange, and can't resume
status := holder.Status(context.Background())
Expand All @@ -237,7 +238,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) {
err := holder.Operate(context.Background(), pb.RelayOp_PauseRelay)
c.Assert(err, IsNil)
c.Assert(holder.Stage(), Equals, pb.Stage_Paused)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)

err = holder.pauseRelay(context.Background(), pb.RelayOp_PauseRelay)
c.Assert(err, ErrorMatches, ".*current stage is Paused.*")
Expand All @@ -253,7 +254,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) {
c.Assert(err, IsNil)
c.Assert(waitRelayStage(holder, pb.Stage_Running, 10), IsTrue)
c.Assert(holder.Result(), IsNil)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)

err = holder.Operate(context.Background(), pb.RelayOp_ResumeRelay)
c.Assert(err, ErrorMatches, ".*current stage is Running.*")
Expand Down Expand Up @@ -281,7 +282,7 @@ func (t *testRelay) testUpdate(c *C, holder *realRelayHolder) {
originStage := holder.Stage()
c.Assert(holder.Update(context.Background(), cfg), IsNil)
c.Assert(waitRelayStage(holder, originStage, 10), IsTrue)
c.Assert(holder.closed.Get(), Equals, closedFalse)
c.Assert(holder.closed.Get(), IsFalse)

r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)
Expand All @@ -296,7 +297,7 @@ func (t *testRelay) testStop(c *C, holder *realRelayHolder) {
err := holder.Operate(context.Background(), pb.RelayOp_StopRelay)
c.Assert(err, IsNil)
c.Assert(holder.Stage(), Equals, pb.Stage_Stopped)
c.Assert(holder.closed.Get(), Equals, closedTrue)
c.Assert(holder.closed.Get(), IsTrue)

err = holder.Operate(context.Background(), pb.RelayOp_StopRelay)
c.Assert(err, ErrorMatches, ".*current stage is already stopped.*")
Expand Down
Loading

0 comments on commit 97a547e

Please sign in to comment.