Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
Extend worker.Worker to support periodic job registration
Browse files Browse the repository at this point in the history
- Provide a concrete periodic `worker.Simple` implementation
  • Loading branch information
acaloiaro committed Nov 13, 2022
1 parent b29eff1 commit 5961a47
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/gorilla/sessions v1.2.1
github.com/monoculum/formam v3.5.5+incompatible
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef
github.com/robfig/cron v1.2.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
github.com/stretchr/testify v1.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef h1:NKxTG6GVGbfMXc2mIk+KphcH6hagbVXhcFkbTgYleTI=
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef/go.mod h1:tcaRap0jS3eifrEEllL6ZMd9dg8IlDpi2S1oARrQ+NI=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
30 changes: 30 additions & 0 deletions worker/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/robfig/cron"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -34,6 +35,7 @@ func NewSimpleWithContext(ctx context.Context) *Simple {
Logger: l,
ctx: ctx,
cancel: cancel,
cron: cron.New(),
handlers: map[string]Handler{},
moot: &sync.Mutex{},
started: false,
Expand All @@ -46,6 +48,7 @@ type Simple struct {
Logger SimpleLogger
ctx context.Context
cancel context.CancelFunc
cron *cron.Cron
handlers map[string]Handler
moot *sync.Mutex
wg sync.WaitGroup
Expand Down Expand Up @@ -76,6 +79,8 @@ func (w *Simple) Start(ctx context.Context) error {
defer w.moot.Unlock()

w.ctx, w.cancel = context.WithCancel(ctx)
w.cron.Start()

w.started = true
return nil
}
Expand All @@ -89,6 +94,7 @@ func (w *Simple) Stop() error {
w.Logger.Info("stopping Simple background worker")

w.cancel()
w.cron.Stop()

w.wg.Wait()
w.Logger.Info("all background jobs stopped completely")
Expand Down Expand Up @@ -196,6 +202,30 @@ func (w *Simple) PerformIn(job Job, d time.Duration) error {
return nil
}

// RegisterPeriodic registers a job to be periodically executed accroding to the given cron specification
func (w *Simple) RegisterPeriodic(cronSpec, name string, h Handler) error {
if name == "" || h == nil {
return fmt.Errorf("name or handler cannot be empty/nil")
}

w.moot.Lock()
defer w.moot.Unlock()
if _, ok := w.handlers[name]; ok {
return fmt.Errorf("handler already mapped for name %s", name)
}
w.handlers[name] = h

w.cron.AddFunc(cronSpec, func() {
w.Perform(Job{
Queue: "system_cron",
Handler: name,
Args: Args{},
})
})

return nil
}

// SimpleLogger is used by the Simple worker to write logs
type SimpleLogger interface {
Debugf(string, ...interface{})
Expand Down
2 changes: 2 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Worker interface {
PerformIn(Job, time.Duration) error
// Register a Handler
Register(string, Handler) error
// RegisterPeriodic performs a job periodically according to the provided cron spec
RegisterPeriodic(cronSpec, jobName string, h Handler) error
}

/* TODO(sio4): #road-to-v1 - redefine Worker interface clearer
Expand Down

0 comments on commit 5961a47

Please sign in to comment.