Skip to content

Commit

Permalink
use atomic.Int32 to eliminate race conditions
Browse files Browse the repository at this point in the history
also changed order of channel sends/receives to send after updating atomic counters

Signed-off-by: Adolfo García Veytia (Puerco) <adolfo.garcia@uservers.net>
  • Loading branch information
derekperkins authored and puerco committed Jul 23, 2024
1 parent a59174a commit 5c66484
Showing 1 changed file with 36 additions and 31 deletions.
67 changes: 36 additions & 31 deletions throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
)

// Throttler stores all the information about the number of workers, the active workers and error information
type Throttler struct {
maxWorkers int
workerCount int
batchingTotal int
batchSize int
totalJobs int
jobsStarted int
jobsCompleted int
maxWorkers int32
workerCount int32
batchingTotal int32
batchSize int32
totalJobs int32
jobsStarted int32
jobsCompleted int32
doneChan chan struct{}
errsMutex *sync.Mutex
errs []error
errorCount int
errorCount int32
}

// New returns a Throttler that will govern the max number of workers and will
Expand All @@ -40,9 +41,9 @@ func New(maxWorkers, totalJobs int) *Throttler {
panic("maxWorkers has to be at least 1")
}
return &Throttler{
maxWorkers: maxWorkers,
maxWorkers: int32(maxWorkers),
batchSize: 1,
totalJobs: totalJobs,
totalJobs: int32(totalJobs),
doneChan: make(chan struct{}, totalJobs),
errsMutex: &sync.Mutex{},
}
Expand All @@ -52,8 +53,8 @@ func New(maxWorkers, totalJobs int) *Throttler {
func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler {
totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize)))
t := New(maxWorkers, totalJobs)
t.batchSize = batchSize
t.batchingTotal = batchingTotal
t.batchSize = int32(batchSize)
t.batchingTotal = int32(batchingTotal)
return t
}

Expand All @@ -63,26 +64,26 @@ func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler {
// all of the jobs have been dispatched. It stops blocking when Done has been called
// as many times as totalJobs.
func (t *Throttler) Throttle() int {
if t.totalJobs < 1 {
return t.errorCount
if atomic.LoadInt32(&t.totalJobs) < 1 {
return int(atomic.LoadInt32(&t.errorCount))
}
t.jobsStarted++
t.workerCount++
atomic.AddInt32(&t.jobsStarted, 1)
atomic.AddInt32(&t.workerCount, 1)

if t.workerCount == t.maxWorkers {
if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
atomic.AddInt32(&t.jobsCompleted, 1)
atomic.AddInt32(&t.workerCount, -1)
<-t.doneChan
t.jobsCompleted++
t.workerCount--
}

if t.jobsStarted == t.totalJobs {
for t.jobsCompleted < t.totalJobs {
if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
atomic.AddInt32(&t.jobsCompleted, 1)
<-t.doneChan
t.jobsCompleted++
}
}

return t.errorCount
return int(atomic.LoadInt32(&t.errorCount))
}

// Done lets Throttler know that a job has been completed so that another worker
Expand All @@ -92,22 +93,26 @@ func (t *Throttler) Done(err error) {
if err != nil {
t.errsMutex.Lock()
t.errs = append(t.errs, err)
t.errorCount++
atomic.AddInt32(&t.errorCount, 1)
t.errsMutex.Unlock()
}
t.doneChan <- struct{}{}
}

// Err returns an error representative of all errors caught by throttler
func (t *Throttler) Err() error {
if len(t.errs) == 0 {
t.errsMutex.Lock()
defer t.errsMutex.Unlock()
if atomic.LoadInt32(&t.errorCount) == 0 {
return nil
}
return multiError(t.errs)
}

// Errs returns a slice of any errors that were received from calling Done()
func (t *Throttler) Errs() []error {
t.errsMutex.Lock()
defer t.errsMutex.Unlock()
return t.errs
}

Expand All @@ -125,22 +130,22 @@ func (te multiError) Error() string {
// until th.Throttle() is called, so if you don't call Throttle before executing this
// again, it will return the same index as before
func (t *Throttler) BatchStartIndex() int {
return t.jobsStarted * t.batchSize
return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize))
}

// BatchEndIndex returns the ending index for the next batch. It either returns the full batch size
// or the remaining amount of jobs. The job count isn't modified
// until th.Throttle() is called, so if you don't call Throttle before executing this
// again, it will return the same index as before.
func (t *Throttler) BatchEndIndex() int {
end := (t.jobsStarted + 1) * t.batchSize
if end > t.batchingTotal {
end = t.batchingTotal
end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize)
if end > atomic.LoadInt32(&t.batchingTotal) {
end = atomic.LoadInt32(&t.batchingTotal)
}
return end
return int(end)
}

// TotalJobs returns the total number of jobs throttler is performing
func (t *Throttler) TotalJobs() int {
return t.totalJobs
return int(atomic.LoadInt32(&t.totalJobs))
}

0 comments on commit 5c66484

Please sign in to comment.