Skip to content

Commit

Permalink
fix concurrent access to jobs map (#589)
Browse files Browse the repository at this point in the history
* fix concurrent access to jobs map

* replace all internal uses of RemoveByReference
  • Loading branch information
JohnRoesler authored Oct 9, 2023
1 parent 207a713 commit e287edf
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 75 deletions.
105 changes: 51 additions & 54 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ func (s *Scheduler) StartAsync() {
func (s *Scheduler) start() {
s.executor.start()
s.setRunning(true)
s.runJobs(s.jobsMap())
s.runJobs()
}

func (s *Scheduler) runJobs(jobs map[uuid.UUID]*Job) {
for _, job := range jobs {
func (s *Scheduler) runJobs() {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
for _, job := range s.jobs {
ctx, cancel := context.WithCancel(context.Background())
job.mu.Lock()
job.ctx = ctx
Expand Down Expand Up @@ -151,12 +153,6 @@ func (s *Scheduler) JobsMap() map[uuid.UUID]*Job {
return jobs
}

func (s *Scheduler) jobsMap() map[uuid.UUID]*Job {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
return s.jobs
}

// Name sets the name of the current job.
//
// If the scheduler is running using WithDistributedLocker(), the job name is used
Expand All @@ -167,12 +163,6 @@ func (s *Scheduler) Name(name string) *Scheduler {
return s
}

func (s *Scheduler) setJobs(jobs map[uuid.UUID]*Job) {
s.jobsMutex.Lock()
defer s.jobsMutex.Unlock()
s.jobs = jobs
}

// Len returns the number of Jobs in the Scheduler
func (s *Scheduler) Len() int {
s.jobsMutex.RLock()
Expand Down Expand Up @@ -231,7 +221,7 @@ func (s *Scheduler) scheduleNextRun(job *Job) (bool, nextRun) {
}

if !job.shouldRun() {
s.RemoveByReference(job)
_ = s.RemoveByID(job)
return false, nextRun{}
}

Expand Down Expand Up @@ -510,21 +500,23 @@ func (s *Scheduler) roundToMidnightAndAddDSTAware(t time.Time, d time.Duration)

// NextRun datetime when the next Job should run.
func (s *Scheduler) NextRun() (*Job, time.Time) {
if len(s.jobsMap()) <= 0 {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
if len(s.jobs) <= 0 {
return nil, time.Time{}
}

var jobID uuid.UUID
var nearestRun time.Time
for _, job := range s.jobsMap() {
for _, job := range s.jobs {
nr := job.NextRun()
if (nr.Before(nearestRun) || nearestRun.IsZero()) && s.now().Before(nr) {
nearestRun = nr
jobID = job.id
}
}

return s.jobsMap()[jobID], nearestRun
return s.jobs[jobID], nearestRun
}

// EveryRandom schedules a new period Job that runs at random intervals
Expand Down Expand Up @@ -650,7 +642,9 @@ func (s *Scheduler) RunAll() {

// RunAllWithDelay runs all Jobs with the provided delay in between each Job
func (s *Scheduler) RunAllWithDelay(d time.Duration) {
for _, job := range s.jobsMap() {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
for _, job := range s.jobs {
s.run(job)
s.time.Sleep(d)
}
Expand Down Expand Up @@ -698,16 +692,13 @@ func (s *Scheduler) Remove(job interface{}) {

// RemoveByReference removes specific Job by reference
func (s *Scheduler) RemoveByReference(job *Job) {
s.removeJobsUniqueTags(job)
s.removeByCondition(func(someJob *Job) bool {
job.mu.RLock()
defer job.mu.RUnlock()
return someJob == job
})
_ = s.RemoveByID(job)
}

func (s *Scheduler) findJobByTaskName(name string) *Job {
for _, job := range s.jobsMap() {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
for _, job := range s.jobs {
if job.funcName == name {
return job
}
Expand All @@ -727,15 +718,14 @@ func (s *Scheduler) removeJobsUniqueTags(job *Job) {
}

func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) {
retainedJobs := make(map[uuid.UUID]*Job, 0)
for _, job := range s.jobsMap() {
if !shouldRemove(job) {
retainedJobs[job.id] = job
} else {
s.jobsMutex.Lock()
defer s.jobsMutex.Unlock()
for _, job := range s.jobs {
if shouldRemove(job) {
s.stopJob(job)
delete(s.jobs, job.id)
}
}
s.setJobs(retainedJobs)
}

func (s *Scheduler) stopJob(job *Job) {
Expand All @@ -760,7 +750,7 @@ func (s *Scheduler) RemoveByTags(tags ...string) error {
}

for _, job := range jobs {
s.RemoveByReference(job)
_ = s.RemoveByID(job)
}
return nil
}
Expand All @@ -780,17 +770,20 @@ func (s *Scheduler) RemoveByTagsAny(tags ...string) error {
}

for job := range mJob {
s.RemoveByReference(job)
_ = s.RemoveByID(job)
}

return errs
}

// RemoveByID removes the job from the scheduler looking up by id
func (s *Scheduler) RemoveByID(job *Job) error {
if _, ok := s.jobsMap()[job.id]; ok {
s.jobsMutex.Lock()
defer s.jobsMutex.Unlock()
if _, ok := s.jobs[job.id]; ok {
s.removeJobsUniqueTags(job)
s.stopJob(job)
delete(s.jobsMap(), job.id)
delete(s.jobs, job.id)
return nil
}
return ErrJobNotFound
Expand All @@ -800,8 +793,10 @@ func (s *Scheduler) RemoveByID(job *Job) error {
func (s *Scheduler) FindJobsByTag(tags ...string) ([]*Job, error) {
var jobs []*Job

s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
Jobs:
for _, job := range s.jobsMap() {
for _, job := range s.jobs {
if job.hasTags(tags...) {
jobs = append(jobs, job)
continue Jobs
Expand Down Expand Up @@ -872,33 +867,31 @@ func (s *Scheduler) SingletonModeAll() {

// TaskPresent checks if specific job's function was added to the scheduler.
func (s *Scheduler) TaskPresent(j interface{}) bool {
for _, job := range s.jobsMap() {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
for _, job := range s.jobs {
if job.funcName == getFunctionName(j) {
return true
}
}
return false
}

// To avoid the recursive read lock on s.jobsMap() and this function,
// creating this new function and distributing the lock between jobPresent, _jobPresent
func (s *Scheduler) _jobPresent(j *Job, jobs map[uuid.UUID]*Job) bool {
func (s *Scheduler) jobPresent(j *Job) bool {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
if _, ok := jobs[j.id]; ok {
if _, ok := s.jobs[j.id]; ok {
return true
}
return false
}

func (s *Scheduler) jobPresent(j *Job) bool {
return s._jobPresent(j, s.jobsMap())
}

// Clear clears all Jobs from this scheduler
func (s *Scheduler) Clear() {
s.stopJobs()
s.setJobs(make(map[uuid.UUID]*Job, 0))
s.jobsMutex.Lock()
defer s.jobsMutex.Unlock()
s.jobs = make(map[uuid.UUID]*Job)
// If unique tags was enabled, delete all the tags loaded in the tags sync.Map
if s.tagsUnique {
s.tags.Range(func(key interface{}, value interface{}) bool {
Expand Down Expand Up @@ -954,7 +947,7 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e
if job.error != nil {
// delete the job from the scheduler as this job
// cannot be executed
s.RemoveByReference(job)
_ = s.RemoveByID(job)
return nil, job.error
}

Expand All @@ -965,7 +958,7 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e

if val.Kind() != reflect.Func {
// delete the job for the same reason as above
s.RemoveByReference(job)
_ = s.RemoveByID(job)
return nil, ErrNotAFunction
}

Expand All @@ -992,13 +985,13 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e
}

if len(params) != expectedParamLength {
s.RemoveByReference(job)
_ = s.RemoveByID(job)
job.error = wrapOrError(job.error, ErrWrongParams)
return nil, job.error
}

if job.runWithDetails && val.Type().In(len(params)).Kind() != reflect.ValueOf(*job).Kind() {
s.RemoveByReference(job)
_ = s.RemoveByID(job)
job.error = wrapOrError(job.error, ErrDoWithJobDetails)
return nil, job.error
}
Expand Down Expand Up @@ -1078,7 +1071,9 @@ func (s *Scheduler) Tag(t ...string) *Scheduler {
// GetAllTags returns all tags.
func (s *Scheduler) GetAllTags() []string {
var tags []string
for _, job := range s.jobsMap() {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
for _, job := range s.jobs {
tags = append(tags, job.Tags()...)
}
return tags
Expand Down Expand Up @@ -1514,7 +1509,9 @@ func (s *Scheduler) WithDistributedElector(e Elector) {
// If a new job is added, an additional call to this method, or the job specific
// version must be executed in order for the new job to trigger event listeners.
func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) {
for _, job := range s.jobsMap() {
s.jobsMutex.RLock()
defer s.jobsMutex.RUnlock()
for _, job := range s.jobs {
job.RegisterEventListeners(eventListeners...)
}
}
Expand Down
Loading

0 comments on commit e287edf

Please sign in to comment.