From 04221628384d09de5c81d9045a4efa076baef442 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 4 Oct 2024 12:35:05 +0200 Subject: [PATCH] Start debouncing pool.fill() calls This commit introduces ne package chan_debouncer that implements debouncing login for given fn() and tests it. It is used to ignore requests to call `pool.fill()` if there is already one call scheduled, but not being executed yet. --- connectionpool.go | 14 +++++-- debounce/simple_debouncer.go | 25 ++++++++++++ debounce/simple_debouncer_test.go | 55 ++++++++++++++++++++++++++ scylla_shard_aware_port_common_test.go | 2 +- 4 files changed, 92 insertions(+), 4 deletions(-) create mode 100644 debounce/simple_debouncer.go create mode 100644 debounce/simple_debouncer_test.go diff --git a/connectionpool.go b/connectionpool.go index c84833419..f085728d1 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -10,6 +10,8 @@ import ( "net" "sync" "time" + + "github.com/gocql/gocql/debounce" ) // interface to implement to receive the host information @@ -203,7 +205,7 @@ func (p *policyConnPool) addHost(host *HostInfo) { } p.mu.Unlock() - pool.fill() + pool.fill_debounce() } func (p *policyConnPool) removeHost(hostID string) { @@ -232,6 +234,7 @@ type hostConnPool struct { connPicker ConnPicker closed bool filling bool + debouncer *debounce.SimpleDebouncer logger StdLogger } @@ -256,6 +259,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int, filling: false, closed: false, logger: session.logger, + debouncer: debounce.NewSimpleDebouncer(), } // the pool is not filled or connected @@ -274,7 +278,7 @@ func (pool *hostConnPool) Pick(token Token, qry ExecutableQuery) *Conn { size, missing := pool.connPicker.Size() if missing > 0 { // try to fill the pool - go pool.fill() + go pool.fill_debounce() if size == 0 { return nil @@ -385,6 +389,10 @@ func (pool *hostConnPool) fill() { }() } +func (pool *hostConnPool) fill_debounce() { + pool.debouncer.Debounce(pool.fill) +} + func (pool *hostConnPool) logConnectErr(err error) { if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") { // connection refused @@ -551,5 +559,5 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) { } pool.connPicker.Remove(conn) - go pool.fill() + go pool.fill_debounce() } diff --git a/debounce/simple_debouncer.go b/debounce/simple_debouncer.go new file mode 100644 index 000000000..5201ffa63 --- /dev/null +++ b/debounce/simple_debouncer.go @@ -0,0 +1,25 @@ +package debounce + +// SimpleDebouncer debounce function call with simple logc: +// 1. If call is currently pending, function call should go through +// 2. If call is scheduled, but not pending, function call should be voided +type SimpleDebouncer struct { + channel chan struct{} +} + +// NewDebouncer creates a new Debouncer with a buffered channel of size 1 +func NewSimpleDebouncer() *SimpleDebouncer { + return &SimpleDebouncer{ + channel: make(chan struct{}, 1), + } +} + +// Debounce attempts to execute the function if the channel allows it +func (d *SimpleDebouncer) Debounce(fn func()) { + select { + case d.channel <- struct{}{}: + fn() + <-d.channel + default: + } +} diff --git a/debounce/simple_debouncer_test.go b/debounce/simple_debouncer_test.go new file mode 100644 index 000000000..c84873177 --- /dev/null +++ b/debounce/simple_debouncer_test.go @@ -0,0 +1,55 @@ +package debounce + +import ( + "runtime" + "sync/atomic" + "testing" +) + +// TestDebouncer tests that the debouncer allows only one function to execute at a time +func TestSimpleDebouncer(t *testing.T) { + d := NewSimpleDebouncer() + var executions int32 + startedCh := make(chan struct{}, 1) + doneCh := make(chan struct{}, 1) + // Function to increment executions + fn := func() { + <-startedCh // Simulate work + atomic.AddInt32(&executions, 1) + <-doneCh // Simulate work + } + t.Run("Case 1", func(t *testing.T) { + // Case 1: Normal single execution + startedCh <- struct{}{} + doneCh <- struct{}{} + d.Debounce(fn) + // We expect that the function has only executed once due to debouncing + if atomic.LoadInt32(&executions) != 1 { + t.Errorf("Expected function to be executed only once, but got %d executions", executions) + } + }) + atomic.StoreInt32(&executions, 0) + t.Run("Case 2", func(t *testing.T) { + // Case 2: Debounce the function multiple times at row when body is started + go d.Debounce(fn) + startedCh <- struct{}{} + go d.Debounce(fn) + go d.Debounce(fn) + doneCh <- struct{}{} + startedCh <- struct{}{} + doneCh <- struct{}{} + waitTillChannelIsEmpty(doneCh) + // We expect that the function has only executed once due to debouncing + if atomic.LoadInt32(&executions) != 2 { + t.Errorf("Expected function to be executed twice, but got %d executions", executions) + } + }) +} +func waitTillChannelIsEmpty(ch chan struct{}) { + for { + if len(ch) == 0 { + return + } + runtime.Gosched() + } +} diff --git a/scylla_shard_aware_port_common_test.go b/scylla_shard_aware_port_common_test.go index f09cbd053..d9d4e0eb3 100644 --- a/scylla_shard_aware_port_common_test.go +++ b/scylla_shard_aware_port_common_test.go @@ -240,7 +240,7 @@ func triggerPoolsRefill(sess *Session) { hosts := sess.ring.allHosts() for _, host := range hosts { hostPool, _ := sess.pool.getPool(host) - go hostPool.fill() + go hostPool.fill_debounce() } }