From 5733a8fcef5e42b2408e588c7de79eef6766df37 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 1 May 2021 19:09:41 +0100 Subject: [PATCH 1/4] Only use boost workers for leveldb shadow queues The leveldb shadow queue of a persistable channel queue should always start with 0 workers and just use boost to add additional workers if necessary. Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 6 +++--- modules/queue/unique_queue_disk_channel.go | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 433435c3015f2..801fd8a12235c 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -75,10 +75,10 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( BatchLength: config.BatchLength, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 6, + BoostWorkers: 1, + MaxWorkers: 5, }, - Workers: 1, + Workers: 0, Name: config.Name + "-level", }, DataDir: config.DataDir, diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 4a69b43eae0e3..47c4f2bdd574d 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -73,12 +73,12 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, - BlockTimeout: 0, - BoostTimeout: 0, - BoostWorkers: 0, - MaxWorkers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 1, + MaxWorkers: 5, }, - Workers: 1, + Workers: 0, Name: config.Name + "-level", }, DataDir: config.DataDir, From 77137444cb9fcb81673e34d09723aadd46de6058 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 1 May 2021 21:55:09 +0100 Subject: [PATCH 2/4] create a zero boost so that if there are no workers in a pool - boost to start the workers Signed-off-by: Andrew Thornton --- modules/queue/workerpool.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 45378e3dae145..05aa9d445d890 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -70,7 +70,11 @@ func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { - p.lock.Unlock() + if p.numberOfWorkers == 0 { + p.zeroBoost() + } else { + p.lock.Unlock() + } p.pushBoost(data) } else { p.lock.Unlock() @@ -78,6 +82,30 @@ func (p *WorkerPool) Push(data Data) { } } +func (p *WorkerPool) zeroBoost() { + ctx, cancel := context.WithCancel(p.baseCtx) + mq := GetManager().GetManagedQueue(p.qid) + boost := p.boostWorkers + if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { + boost = p.maxNumberOfWorkers - p.numberOfWorkers + } + if mq != nil { + log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout) + + start := time.Now() + pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) + go func() { + <-ctx.Done() + mq.RemoveWorkers(pid) + cancel() + }() + } else { + log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) + } + p.lock.Unlock() + p.addWorkers(ctx, boost) +} + func (p *WorkerPool) pushBoost(data Data) { select { case p.dataChan <- data: From 45ed06c5dd94f20c6e1be3f06935befef0533ea4 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 1 May 2021 22:14:20 +0100 Subject: [PATCH 3/4] oops Signed-off-by: Andrew Thornton --- modules/queue/workerpool.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 05aa9d445d890..342fcf8a4acf9 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -95,12 +95,22 @@ func (p *WorkerPool) zeroBoost() { start := time.Now() pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) go func() { - <-ctx.Done() + select { + case <-ctx.Done(): + case <-time.After(p.boostTimeout): + } mq.RemoveWorkers(pid) cancel() }() } else { log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) + go func() { + select { + case <-ctx.Done(): + case <-time.After(p.boostTimeout): + } + cancel() + }() } p.lock.Unlock() p.addWorkers(ctx, boost) From b1f6a0cfd3a6976a867270aaf4d0f86372205b8f Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 1 May 2021 22:18:49 +0100 Subject: [PATCH 4/4] actually set timeout appropriately on boosted workers Signed-off-by: Andrew Thornton --- modules/queue/workerpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 342fcf8a4acf9..0f15ccac9efd7 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -93,7 +93,7 @@ func (p *WorkerPool) zeroBoost() { log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) go func() { select { case <-ctx.Done(): @@ -150,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) go func() { <-ctx.Done() mq.RemoveWorkers(pid)