Skip to content

Commit

Permalink
Added gauge for the number of active routines (#2061)
Browse files Browse the repository at this point in the history
* Added gauge for the number of active routines

* Renamed active => working
  • Loading branch information
simonrobb authored Dec 6, 2019
1 parent 4a1702d commit 7e54ccf
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions src/x/sync/pooled_worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ const (

type pooledWorkerPool struct {
sync.Mutex
numRoutinesAtomic int64
numRoutinesGauge tally.Gauge
growOnDemand bool
workChs []chan Work
numShards int64
killWorkerProbability float64
nowFn NowFn
numRoutinesAtomic int64
numWorkingRoutinesAtomic int64
numRoutinesGauge tally.Gauge
numWorkingRoutinesGauge tally.Gauge
growOnDemand bool
workChs []chan Work
numShards int64
killWorkerProbability float64
nowFn NowFn
}

// NewPooledWorkerPool creates a new worker pool.
Expand All @@ -62,13 +64,15 @@ func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPo
}

return &pooledWorkerPool{
numRoutinesAtomic: 0,
numRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-routines"),
growOnDemand: opts.GrowOnDemand(),
workChs: workChs,
numShards: numShards,
killWorkerProbability: opts.KillWorkerProbability(),
nowFn: opts.NowFn(),
numRoutinesAtomic: 0,
numWorkingRoutinesAtomic: 0,
numRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-routines"),
numWorkingRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-working-routines"),
growOnDemand: opts.GrowOnDemand(),
workChs: workChs,
numShards: numShards,
killWorkerProbability: opts.KillWorkerProbability(),
nowFn: opts.NowFn(),
}, nil
}

Expand All @@ -91,6 +95,7 @@ func (p *pooledWorkerPool) Go(work Work) {

if currTime%numGoroutinesGaugeSampleRate == 0 {
p.emitNumRoutines()
p.emitNumWorkingRoutines()
}

if !p.growOnDemand {
Expand Down Expand Up @@ -138,7 +143,9 @@ func (p *pooledWorkerPool) spawnWorker(
killThreshold = uint64(p.killWorkerProbability * float64(math.MaxUint64))
)
for f := range workCh {
p.incNumWorkingRoutines()
f()
p.decNumWorkingRoutines()
if rng.Random() < killThreshold {
if spawnReplacement {
p.spawnWorker(rng.Random(), nil, workCh, true)
Expand Down Expand Up @@ -166,3 +173,20 @@ func (p *pooledWorkerPool) decNumRoutines() {
func (p *pooledWorkerPool) getNumRoutines() int64 {
return atomic.LoadInt64(&p.numRoutinesAtomic)
}

func (p *pooledWorkerPool) emitNumWorkingRoutines() {
numRoutines := float64(p.getNumWorkingRoutines())
p.numWorkingRoutinesGauge.Update(numRoutines)
}

func (p *pooledWorkerPool) incNumWorkingRoutines() {
atomic.AddInt64(&p.numWorkingRoutinesAtomic, 1)
}

func (p *pooledWorkerPool) decNumWorkingRoutines() {
atomic.AddInt64(&p.numWorkingRoutinesAtomic, -1)
}

func (p *pooledWorkerPool) getNumWorkingRoutines() int64 {
return atomic.LoadInt64(&p.numWorkingRoutinesAtomic)
}

0 comments on commit 7e54ccf

Please sign in to comment.