Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
fixed racing condition of the simple worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sio4 committed May 14, 2022
1 parent 1789868 commit deaf8bb
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
28 changes: 26 additions & 2 deletions worker/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewSimpleWithContext(ctx context.Context) *Simple {
cancel: cancel,
handlers: map[string]Handler{},
moot: &sync.Mutex{},
started: false,
}
}

Expand All @@ -48,6 +49,7 @@ type Simple struct {
handlers map[string]Handler
moot *sync.Mutex
wg sync.WaitGroup
started bool
}

// Register Handler with the worker
Expand All @@ -70,7 +72,11 @@ func (w *Simple) Start(ctx context.Context) error {
// TODO(sio4): #road-to-v1 - define the purpose of Start clearly
w.Logger.Info("starting Simple background worker")

w.moot.Lock()
defer w.moot.Unlock()

w.ctx, w.cancel = context.WithCancel(ctx)
w.started = true
return nil
}

Expand All @@ -91,6 +97,13 @@ func (w *Simple) Stop() error {

// Perform a job as soon as possibly using a goroutine.
func (w *Simple) Perform(job Job) error {
w.moot.Lock()
defer w.moot.Unlock()

if !w.started {
return fmt.Errorf("worker is not yet started")
}

// Perform should not allow a job submission if the worker is not running
if err := w.ctx.Err(); err != nil {
return fmt.Errorf("worker is not ready to perform a job: %v", err)
Expand All @@ -104,8 +117,6 @@ func (w *Simple) Perform(job Job) error {
return err
}

w.moot.Lock()
defer w.moot.Unlock()
if h, ok := w.handlers[job.Handler]; ok {
// TODO(sio4): #road-to-v1 - consider timeout and/or cancellation
w.wg.Add(1)
Expand Down Expand Up @@ -145,6 +156,19 @@ func (w *Simple) PerformIn(job Job, d time.Duration) error {
go func() {
defer w.wg.Done()

for {
w.moot.Lock()
if w.started {
w.moot.Unlock()
break
}
w.moot.Unlock()

waiting := 100 * time.Millisecond
time.Sleep(waiting)
d = d - waiting
}

select {
case <-time.After(d):
w.Perform(job)
Expand Down
12 changes: 12 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ type Worker interface {
// Register a Handler
Register(string, Handler) error
}

/* TODO(sio4): #road-to-v1 - redefine Worker interface clearer
1. The Start() functions of current implementations including Simple,
Gocraft Work Adapter do not block and immediately return the error.
However, App.Serve() calls them within a go routine.
2. The Perform() family of functions can be called before the worker
was started once the worker configured. Could be fine but there should
be some guidiance for its usage.
3. The Perform() function could be interpreted as "Do it" by its name but
their actual job is "Enqueue it" even though Simple worker has no clear
boundary between them. It could make confusion.
*/

0 comments on commit deaf8bb

Please sign in to comment.