From 5c664846ab1774b939d514167aa713fdadae12ef Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Thu, 16 Aug 2018 12:25:27 -0600 Subject: [PATCH] use atomic.Int32 to eliminate race conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit also changed order of channel sends/receives to send after updating atomic counters Signed-off-by: Adolfo GarcĂ­a Veytia (Puerco) --- throttler/throttler.go | 67 +++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/throttler/throttler.go b/throttler/throttler.go index 51ea3bf..8103dca 100644 --- a/throttler/throttler.go +++ b/throttler/throttler.go @@ -16,21 +16,22 @@ import ( "fmt" "math" "sync" + "sync/atomic" ) // Throttler stores all the information about the number of workers, the active workers and error information type Throttler struct { - maxWorkers int - workerCount int - batchingTotal int - batchSize int - totalJobs int - jobsStarted int - jobsCompleted int + maxWorkers int32 + workerCount int32 + batchingTotal int32 + batchSize int32 + totalJobs int32 + jobsStarted int32 + jobsCompleted int32 doneChan chan struct{} errsMutex *sync.Mutex errs []error - errorCount int + errorCount int32 } // New returns a Throttler that will govern the max number of workers and will @@ -40,9 +41,9 @@ func New(maxWorkers, totalJobs int) *Throttler { panic("maxWorkers has to be at least 1") } return &Throttler{ - maxWorkers: maxWorkers, + maxWorkers: int32(maxWorkers), batchSize: 1, - totalJobs: totalJobs, + totalJobs: int32(totalJobs), doneChan: make(chan struct{}, totalJobs), errsMutex: &sync.Mutex{}, } @@ -52,8 +53,8 @@ func New(maxWorkers, totalJobs int) *Throttler { func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler { totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize))) t := New(maxWorkers, totalJobs) - t.batchSize = batchSize - t.batchingTotal = batchingTotal + t.batchSize = int32(batchSize) + t.batchingTotal = int32(batchingTotal) return t } @@ -63,26 +64,26 @@ func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler { // all of the jobs have been dispatched. It stops blocking when Done has been called // as many times as totalJobs. func (t *Throttler) Throttle() int { - if t.totalJobs < 1 { - return t.errorCount + if atomic.LoadInt32(&t.totalJobs) < 1 { + return int(atomic.LoadInt32(&t.errorCount)) } - t.jobsStarted++ - t.workerCount++ + atomic.AddInt32(&t.jobsStarted, 1) + atomic.AddInt32(&t.workerCount, 1) - if t.workerCount == t.maxWorkers { + if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) { + atomic.AddInt32(&t.jobsCompleted, 1) + atomic.AddInt32(&t.workerCount, -1) <-t.doneChan - t.jobsCompleted++ - t.workerCount-- } - if t.jobsStarted == t.totalJobs { - for t.jobsCompleted < t.totalJobs { + if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) { + for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) { + atomic.AddInt32(&t.jobsCompleted, 1) <-t.doneChan - t.jobsCompleted++ } } - return t.errorCount + return int(atomic.LoadInt32(&t.errorCount)) } // Done lets Throttler know that a job has been completed so that another worker @@ -92,7 +93,7 @@ func (t *Throttler) Done(err error) { if err != nil { t.errsMutex.Lock() t.errs = append(t.errs, err) - t.errorCount++ + atomic.AddInt32(&t.errorCount, 1) t.errsMutex.Unlock() } t.doneChan <- struct{}{} @@ -100,7 +101,9 @@ func (t *Throttler) Done(err error) { // Err returns an error representative of all errors caught by throttler func (t *Throttler) Err() error { - if len(t.errs) == 0 { + t.errsMutex.Lock() + defer t.errsMutex.Unlock() + if atomic.LoadInt32(&t.errorCount) == 0 { return nil } return multiError(t.errs) @@ -108,6 +111,8 @@ func (t *Throttler) Err() error { // Errs returns a slice of any errors that were received from calling Done() func (t *Throttler) Errs() []error { + t.errsMutex.Lock() + defer t.errsMutex.Unlock() return t.errs } @@ -125,7 +130,7 @@ func (te multiError) Error() string { // until th.Throttle() is called, so if you don't call Throttle before executing this // again, it will return the same index as before func (t *Throttler) BatchStartIndex() int { - return t.jobsStarted * t.batchSize + return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize)) } // BatchEndIndex returns the ending index for the next batch. It either returns the full batch size @@ -133,14 +138,14 @@ func (t *Throttler) BatchStartIndex() int { // until th.Throttle() is called, so if you don't call Throttle before executing this // again, it will return the same index as before. func (t *Throttler) BatchEndIndex() int { - end := (t.jobsStarted + 1) * t.batchSize - if end > t.batchingTotal { - end = t.batchingTotal + end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize) + if end > atomic.LoadInt32(&t.batchingTotal) { + end = atomic.LoadInt32(&t.batchingTotal) } - return end + return int(end) } // TotalJobs returns the total number of jobs throttler is performing func (t *Throttler) TotalJobs() int { - return t.totalJobs + return int(atomic.LoadInt32(&t.totalJobs)) }