Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

fix data race in import unit(loader) (#349) #353

Merged
merged 6 commits into from
Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ func NewSubTask(cfg *config.SubTaskConfig) *SubTask {
// NewSubTaskWithStage creates a new SubTask with stage
func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage) *SubTask {
st := SubTask{
cfg: cfg,
units: createUnits(cfg),
stage: stage,
l: log.With(zap.String("subtask", cfg.Name)),
cfg: cfg,
units: createUnits(cfg),
stage: stage,
l: log.With(zap.String("subtask", cfg.Name)),
DDLInfo: make(chan *pb.DDLInfo, 1),
}
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
return &st
Expand All @@ -108,8 +109,6 @@ func (st *SubTask) Init() error {
return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode)
}

st.DDLInfo = make(chan *pb.DDLInfo, 1)

initializeUnitSuccess := true
// when error occurred, initialized units should be closed
// when continue sub task from loader / syncer, ahead units should be closed
Expand Down
100 changes: 70 additions & 30 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,49 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) {

// Close closes worker
func (w *Worker) Close() {
// simulate the case that doesn't wait all doJob goroutine exit
failpoint.Inject("workerCantClose", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "workerCantClose"))
failpoint.Return()
})

if !atomic.CompareAndSwapInt64(&w.closed, 0, 1) {
w.wg.Wait()
w.tctx.L().Info("already closed...")
return
}

w.tctx.L().Info("start to close...")
close(w.jobQueue)
w.wg.Wait()
w.tctx.L().Info("closed !!!")
}

func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) {
func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalChan chan *pb.ProcessError) {
atomic.StoreInt64(&w.closed, 0)
defer workerWg.Done()

newCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
// make sure all doJob goroutines exit
w.Close()
}()

ctctx := w.tctx.WithContext(newCtx)

doJob := func() {
defer w.wg.Done()
for {
select {
case <-newCtx.Done():
w.tctx.L().Debug("execution goroutine exits")
w.tctx.L().Info("context canceled, execution goroutine exits")
return
case job, ok := <-w.jobQueue:
if !ok || job == nil {
if !ok {
w.tctx.L().Info("job queue was closed, execution goroutine exits")
return
}
if job == nil {
w.tctx.L().Info("jobs are finished, execution goroutine exits")
return
}
sqls := make([]string, 0, 3)
Expand All @@ -149,7 +166,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *

failpoint.Inject("LoadDataSlowDown", nil)

if err := w.conn.executeSQL(ctctx, sqls); err != nil {
err := w.conn.executeSQL(ctctx, sqls)
failpoint.Inject("executeSQLError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "executeSQLError"))
err = errors.New("inject failpoint executeSQLError")
})
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
Expand All @@ -164,15 +186,19 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *
for {
select {
case <-newCtx.Done():
w.tctx.L().Info("context canceled, main goroutine exits")
return
case job, ok := <-fileJobQueue:
if !ok {
w.tctx.L().Debug("main routine exit.")
w.tctx.L().Info("file queue was closed, main routine exit.")
return
}

w.wg.Add(1)
go doJob()
go func() {
defer w.wg.Done()
doJob()
}()

// restore a table
if err := w.restoreDataFile(ctx, filepath.Join(w.cfg.Dir, job.dataFile), job.offset, job.info); err != nil {
Expand All @@ -192,12 +218,17 @@ func (w *Worker) restoreDataFile(ctx context.Context, filePath string, offset in
return err
}

// dispatchSQL completed, send nil.
failpoint.Inject("dispatchError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "dispatchError"))
failpoint.Return(errors.New("inject failpoint dispatchError"))
})

// dispatchSQL completed, send nil to make sure all dmls are applied to target database
// we don't want to close and re-make chan frequently
// but if we need to re-call w.run, we need re-make jobQueue chan
w.jobQueue <- nil

w.wg.Wait()

w.tctx.L().Info("finish to restore dump sql file", zap.String("data file", filePath))
return nil
}
Expand Down Expand Up @@ -333,7 +364,6 @@ type Loader struct {
bwList *filter.Filter
columnMapping *cm.Mapping

pool []*Worker
closed sync2.AtomicBool

toDB *conn.BaseDB
Expand All @@ -355,7 +385,6 @@ func NewLoader(cfg *config.SubTaskConfig) *Loader {
db2Tables: make(map[string]Tables2DataFiles),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
pool: make([]*Worker, 0, cfg.PoolSize),
tctx: tcontext.Background().WithLogger(log.With(zap.String("task", cfg.Name), zap.String("unit", "load"))),
}
loader.fileJobQueueClosed.Set(true) // not open yet
Expand Down Expand Up @@ -449,7 +478,13 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {

err := l.Restore(newCtx)
close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan
wg.Wait() // wait for receive all fatal from l.runFatalChan

failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) {
l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit"))
l.workerWg.Wait()
})

wg.Wait() // wait for receive all fatal from l.runFatalChan

if err != nil {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Inc()
Expand Down Expand Up @@ -536,10 +571,21 @@ func (l *Loader) Restore(ctx context.Context) error {

go l.PrintStatus(ctx)

if err := l.restoreData(ctx); err != nil {
if errors.Cause(err) == context.Canceled {
return nil
}
begin := time.Now()
err = l.restoreData(ctx)

failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) {
l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit"))
failpoint.Return(nil)
})

// make sure all workers exit
l.closeFileJobQueue() // all data file dispatched, close it
l.workerWg.Wait()

if err == nil {
l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
} else if errors.Cause(err) != context.Canceled {
return err
}

Expand Down Expand Up @@ -576,13 +622,11 @@ func (l *Loader) Close() {
func (l *Loader) stopLoad() {
// before re-write workflow, simply close all job queue and job workers
// when resuming, re-create them
l.tctx.L().Info("stop importing data process")

l.closeFileJobQueue()
l.workerWg.Wait()

for _, worker := range l.pool {
worker.Close()
}
l.pool = l.pool[:0]
l.tctx.L().Debug("all workers have been closed")
}

Expand Down Expand Up @@ -709,9 +753,10 @@ func (l *Loader) initAndStartWorkerPool(ctx context.Context) error {
}

l.workerWg.Add(1) // for every worker goroutine, Add(1)
go worker.run(ctx, l.fileJobQueue, l.workerWg, l.runFatalChan)

l.pool = append(l.pool, worker)
go func() {
defer l.workerWg.Done()
worker.run(ctx, l.fileJobQueue, l.runFatalChan)
}()
}
return nil
}
Expand Down Expand Up @@ -1106,17 +1151,12 @@ func (l *Loader) restoreData(ctx context.Context) error {
select {
case <-ctx.Done():
l.tctx.L().Warn("stop dispatch data file job", log.ShortError(ctx.Err()))
l.closeFileJobQueue()
return ctx.Err()
case l.fileJobQueue <- j:
}
}
l.closeFileJobQueue() // all data file dispatched, close it

l.tctx.L().Info("all data files have been dispatched, waiting for them finished")
l.workerWg.Wait()

l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions tests/import_goroutine_leak/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Master Configuration.

[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"

[[deploy]]
source-id = "mysql-replica-02"
dm-worker = "127.0.0.1:8263"
49 changes: 49 additions & 0 deletions tests/import_goroutine_leak/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
name: test
task-mode: full
is-sharding: false
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
black-white-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
black-white-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["import_goroutine_leak"]

mydumpers:
global:
mydumper-path: "./bin/mydumper"
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "--statement-size=100"

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
13 changes: 13 additions & 0 deletions tests/import_goroutine_leak/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Worker Configuration.

source-id = "mysql-replica-01"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "root"
password = ""
port = 3306
13 changes: 13 additions & 0 deletions tests/import_goroutine_leak/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Worker Configuration.

source-id = "mysql-replica-02"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "root"
password = ""
port = 3307
Loading