Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
worker: fix potential panic when restoring subtask from meta (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Oct 16, 2019
1 parent 2ffcec0 commit 4c28fed
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
51 changes: 33 additions & 18 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,29 @@ type Worker struct {
}

// NewWorker creates a new Worker
func NewWorker(cfg *Config) (*Worker, error) {
w := &Worker{
func NewWorker(cfg *Config) (w *Worker, err error) {
w = &Worker{
cfg: cfg,
relayHolder: NewRelayHolder(cfg),
tracer: tracing.InitTracerHub(cfg.Tracer),
subTaskHolder: newSubTaskHolder(),
l: log.With(zap.String("component", "worker controller")),
}
w.ctx, w.cancel = context.WithCancel(context.Background())

defer func(w2 *Worker) {
if err != nil { // when err != nil, `w` will become nil in this func, so we pass `w` in defer.
// release resources, NOTE: we need to refactor New/Init/Start/Close for components later.
w2.cancel()
w2.subTaskHolder.closeAllSubTasks()
if w2.meta != nil {
w2.meta.Close()
}
if w2.db != nil {
w2.db.Close()
}
}
}(w)

// initial relay holder
purger, err := w.relayHolder.Init([]purger.PurgeInterceptor{
Expand Down Expand Up @@ -107,16 +122,18 @@ func NewWorker(cfg *Config) (*Worker, error) {
}

// open kv db
w.db, err = openDB(dbDir, defaultKVConfig)
metaDB, err := openDB(dbDir, defaultKVConfig)
if err != nil {
return nil, err
}
w.db = metaDB

// initial metadata
w.meta, err = NewMetadata(dbDir, w.db)
meta, err := NewMetadata(dbDir, w.db)
if err != nil {
return nil, err
}
w.meta = meta

InitConditionHub(w)

Expand All @@ -125,6 +142,18 @@ func NewWorker(cfg *Config) (*Worker, error) {
return nil, err
}

w.l.Info("initialized")

return w, nil
}

// Start starts working
func (w *Worker) Start() {
if w.closed.Get() == closedTrue {
w.l.Warn("already closed")
return
}

// start relay
w.relayHolder.Start()

Expand All @@ -141,20 +170,6 @@ func NewWorker(cfg *Config) (*Worker, error) {
w.tracer.Start()
}

w.ctx, w.cancel = context.WithCancel(context.Background())

w.l.Info("initialzed")

return w, nil
}

// Start starts working
func (w *Worker) Start() {
if w.closed.Get() == closedTrue {
w.l.Warn("already closed")
return
}

w.wg.Add(2)
defer w.wg.Done()

Expand Down
6 changes: 3 additions & 3 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (t *testServer) testWorkerHandleTask(c *C) {
w.handleTask()
}()

c.Assert(utils.WaitSomething(5, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool {
w.meta.Lock()
defer w.meta.Unlock()
return len(w.meta.logs) == 0
Expand Down Expand Up @@ -154,7 +154,7 @@ func (t *testServer) TestTaskAutoResume(c *C) {
defer s.Close()
c.Assert(s.Start(), IsNil)
}()
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool {
return !s.closed.Get()
}), IsTrue)

Expand All @@ -165,7 +165,7 @@ func (t *testServer) TestTaskAutoResume(c *C) {
c.Assert(err, IsNil)

// check task in paused state
c.Assert(utils.WaitSomething(10, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool {
for _, st := range s.worker.QueryStatus(taskName) {
if st.Name == taskName && st.Stage == pb.Stage_Paused {
return true
Expand Down

0 comments on commit 4c28fed

Please sign in to comment.