From 14152988eee95d989d43e7130f22a0ec72fe9400 Mon Sep 17 00:00:00 2001 From: Robert Kopaczewski Date: Thu, 27 Aug 2020 18:59:25 +0200 Subject: [PATCH] feat: jobs system (#15) * feat: jobs system * lint --- jobs/interface.go | 24 ++++++ jobs/jobs.go | 88 ++++++++++++++++++++++ jobs/runner.go | 185 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 297 insertions(+) create mode 100644 jobs/interface.go create mode 100644 jobs/jobs.go create mode 100644 jobs/runner.go diff --git a/jobs/interface.go b/jobs/interface.go new file mode 100644 index 0000000..d44e588 --- /dev/null +++ b/jobs/interface.go @@ -0,0 +1,24 @@ +package jobs + +import ( + "context" + "fmt" + "time" +) + +type Runnable interface { + fmt.Stringer + Run(ctx context.Context) (isDone bool, err error) + RunTimeout() time.Duration +} + +type LockableRunnable interface { + Runnable + LockKey() string +} + +type PeriodicRunnable interface { + LockableRunnable + RunPeriod() time.Duration + RunResolution() time.Duration +} diff --git a/jobs/jobs.go b/jobs/jobs.go new file mode 100644 index 0000000..ba30525 --- /dev/null +++ b/jobs/jobs.go @@ -0,0 +1,88 @@ +package jobs + +import ( + "context" + "fmt" + "time" +) + +type OneOffJob struct { + Name string + Func func(ctx context.Context) error + Timeout time.Duration // Passed as context.WithTimeout to Func. +} + +func (j *OneOffJob) Run(ctx context.Context) (bool, error) { + return true, j.Func(ctx) +} + +func (j *OneOffJob) RunTimeout() time.Duration { + return j.Timeout +} + +func (j *OneOffJob) String() string { + return fmt.Sprintf("OneOffJob", j.Name) +} + +var _ Runnable = (*OneOffJob)(nil) + +type PeriodicJob struct { + Func func(ctx context.Context) (isDone bool, err error) + Timeout time.Duration // Passed as context.WithTimeout to Func. + Period time.Duration + Resolution time.Duration // Time between attempts to grab a lock, defaults to max(min(1/4 of Perion, 1 Hour), 5 Min) + Name string +} + +var _ PeriodicRunnable = (*PeriodicJob)(nil) + +func (j *PeriodicJob) Run(ctx context.Context) (bool, error) { + return j.Func(ctx) +} + +func (j *PeriodicJob) RunTimeout() time.Duration { + return j.Timeout +} + +const ( + minDefaultResolution = 5 * time.Minute + maxDefaultResolution = 1 * time.Hour +) + +func DefaultResolution(period time.Duration) time.Duration { + res := period / 4 + + if res < minDefaultResolution { + return minDefaultResolution + } + + if res > maxDefaultResolution { + return maxDefaultResolution + } + + return res +} + +func (j *PeriodicJob) RunResolution() time.Duration { + if j.Resolution == 0 { + return DefaultResolution(j.Period) + } + + return j.Resolution +} + +func (j *PeriodicJob) RunPeriod() time.Duration { + return j.Period +} + +func (j *PeriodicJob) LockKey() string { + return j.Name +} + +func (j *PeriodicJob) String() string { + return fmt.Sprintf("PeriodicJob", j.Name) +} + +func partition(t LockableRunnable) string { + return "0" +} diff --git a/jobs/runner.go b/jobs/runner.go new file mode 100644 index 0000000..249f349 --- /dev/null +++ b/jobs/runner.go @@ -0,0 +1,185 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-redis/redis/v7" + "go.uber.org/zap" +) + +type Runner struct { + log *zap.Logger + wg sync.WaitGroup + rc *redis.Client + stop chan struct{} + cfg Config + + errorHandlerMu sync.RWMutex + errorHandler func(t Runnable, err error) +} + +type Config struct { + ServiceKey string + Partition func(t LockableRunnable) string +} + +var DefaultConfig = Config{ + ServiceKey: "jobs", + Partition: partition, +} + +type Option func(*Config) + +func WithServiceKey(val string) Option { + return func(config *Config) { + config.ServiceKey = val + } +} +func WithPartition(f func(LockableRunnable) string) Option { + return func(config *Config) { + config.Partition = f + } +} + +func New(log *zap.Logger, rc *redis.Client, opts ...Option) *Runner { + cfg := DefaultConfig + + for _, opt := range opts { + opt(&cfg) + } + + return &Runner{ + log: log, + rc: rc, + stop: make(chan struct{}), + cfg: cfg, + } +} + +func (p *Runner) createLockKey(t LockableRunnable) string { + return fmt.Sprintf("%s:%s:lock:%s", p.cfg.Partition(t), p.cfg.ServiceKey, t.LockKey()) +} + +func (p *Runner) runPeriodic(job PeriodicRunnable) { + initialRun := true + + if initialRun { + p.wg.Add(1) + } + + lockKey := p.createLockKey(job) + period := job.RunPeriod() + + go func() { + ticker := time.NewTicker(job.RunResolution()) + + for { + if !initialRun { + p.wg.Add(1) + } + + initialRun = false + + ok, err := p.rc.SetNX(lockKey, 1, period).Result() + if !ok || err != nil { + if err != nil { + p.log.With(zap.Error(err)).Error("Could not obtain redis lock for periodic job") + } + + p.wg.Done() + + select { + case <-ticker.C: + continue + case <-p.stop: + return + } + } + + isDone, _ := p.processJob(job) + + if !isDone { + continue + } + + select { + case <-ticker.C: + case <-p.stop: + return + } + } + }() +} + +func (p *Runner) processJob(job Runnable) (bool, error) { + // Run job with optional timeout. + timeout := job.RunTimeout() + + var ( + ctx context.Context + cancel context.CancelFunc + ) + + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } + + p.log.With(zap.Stringer("job", job)).Info("Running job") + + isDone, err := job.Run(ctx) + if err != nil { + p.handleJobError(job, err) + } + + cancel() + + return isDone, err +} + +func (p *Runner) Run(job Runnable) { + if periodic, ok := job.(PeriodicRunnable); ok { + p.runPeriodic(periodic) + + return + } + + p.wg.Add(1) + + _, _ = p.processJob(job) + p.wg.Done() +} + +func (p *Runner) Go(f func()) { + p.wg.Add(1) + + go f() + + p.wg.Done() +} + +func (p *Runner) handleJobError(job Runnable, err error) { + p.errorHandlerMu.RLock() + defer p.errorHandlerMu.RUnlock() + + if p.errorHandler != nil { + p.errorHandler(job, err) + + return + } + + p.log.Error("Job failed", zap.Error(err)) +} + +func (p *Runner) ErrorHandler(errorHandler func(t Runnable, err error)) { + p.errorHandlerMu.Lock() + p.errorHandler = errorHandler + p.errorHandlerMu.Unlock() +} + +func (p *Runner) GracefulStop() { + close(p.stop) + p.wg.Wait() +}