From 99c2ec7041536a430b5eece5c6416ad5490af941 Mon Sep 17 00:00:00 2001 From: Laurence <45508533+LaurenceLiZhixin@users.noreply.github.com> Date: Sun, 17 Oct 2021 11:53:14 +0800 Subject: [PATCH] Fix: default not to start gr pool limit (#72) * fix: default not start gr pool * fix: ut --- sync/base_worker_pool.go | 7 +++++++ sync/connection_pool.go | 5 +++++ sync/connection_pool_test.go | 10 ++++++++++ 3 files changed, 22 insertions(+) diff --git a/sync/base_worker_pool.go b/sync/base_worker_pool.go index 695ddef..dbff5b0 100644 --- a/sync/base_worker_pool.go +++ b/sync/base_worker_pool.go @@ -36,6 +36,7 @@ type WorkerPoolConfig struct { NumQueues int QueueSize int Logger gxlog.Logger + Enable bool } // baseWorkerPool is a worker pool with multiple queues. @@ -69,6 +70,7 @@ type baseWorkerPool struct { taskQueues []chan task numWorkers *atomic.Int32 + enable bool wg *sync.WaitGroup } @@ -94,6 +96,11 @@ func newBaseWorkerPool(config WorkerPoolConfig) *baseWorkerPool { taskQueues: taskQueues, numWorkers: new(atomic.Int32), wg: new(sync.WaitGroup), + enable: config.Enable, + } + + if !config.Enable { + return p } initWg := new(sync.WaitGroup) diff --git a/sync/connection_pool.go b/sync/connection_pool.go index b82ee5d..5ef71b3 100644 --- a/sync/connection_pool.go +++ b/sync/connection_pool.go @@ -45,6 +45,11 @@ func (p *ConnectionPool) Submit(t task) error { return perrors.New("task shouldn't be nil") } + if !p.enable { + go t() + return nil + } + // put the task to a queue using Round Robin algorithm taskId := atomic.AddUint32(&p.taskId, 1) select { diff --git a/sync/connection_pool_test.go b/sync/connection_pool_test.go index 5ae9929..b6a6723 100644 --- a/sync/connection_pool_test.go +++ b/sync/connection_pool_test.go @@ -36,6 +36,7 @@ func TestConnectionPool(t *testing.T) { NumQueues: runtime.NumCPU(), QueueSize: 10, Logger: nil, + Enable: true, }) var count int64 wg := new(sync.WaitGroup) @@ -60,6 +61,7 @@ func TestConnectionPool(t *testing.T) { NumQueues: 1, QueueSize: 0, Logger: nil, + Enable: true, }) wg := new(sync.WaitGroup) @@ -85,6 +87,7 @@ func TestConnectionPool(t *testing.T) { NumWorkers: runtime.NumCPU(), NumQueues: runtime.NumCPU(), QueueSize: 100, + Enable: true, Logger: nil, }) @@ -102,6 +105,7 @@ func TestConnectionPool(t *testing.T) { p := NewConnectionPool(WorkerPoolConfig{ NumWorkers: 0, NumQueues: runtime.NumCPU(), + Enable: true, QueueSize: 100, Logger: nil, }) @@ -111,6 +115,7 @@ func TestConnectionPool(t *testing.T) { p = NewConnectionPool(WorkerPoolConfig{ NumWorkers: 1, NumQueues: 0, + Enable: true, QueueSize: 0, Logger: nil, }) @@ -123,6 +128,7 @@ func TestConnectionPool(t *testing.T) { NumQueues: 1, QueueSize: -1, Logger: nil, + Enable: true, }) err = p.Submit(func() {}) @@ -134,6 +140,7 @@ func TestConnectionPool(t *testing.T) { p := NewConnectionPool(WorkerPoolConfig{ NumWorkers: 1, NumQueues: 1, + Enable: true, QueueSize: 0, Logger: nil, }) @@ -149,6 +156,7 @@ func TestConnectionPool(t *testing.T) { NumQueues: runtime.NumCPU(), QueueSize: 10, Logger: nil, + Enable: true, }) task, v := newCountTask() @@ -174,6 +182,7 @@ func TestConnectionPool(t *testing.T) { NumQueues: runtime.NumCPU(), QueueSize: 10, Logger: nil, + Enable: true, }) task, v := newCountTask() @@ -192,6 +201,7 @@ func BenchmarkConnectionPool(b *testing.B) { NumWorkers: 100, NumQueues: runtime.NumCPU(), QueueSize: 100, + Enable: true, Logger: nil, })