Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start debouncing pool.fill() calls #292

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net"
"sync"
"time"

"github.com/gocql/gocql/debounce"
)

// interface to implement to receive the host information
Expand Down Expand Up @@ -203,7 +205,7 @@ func (p *policyConnPool) addHost(host *HostInfo) {
}
p.mu.Unlock()

pool.fill()
pool.fill_debounce()
}

func (p *policyConnPool) removeHost(hostID string) {
Expand Down Expand Up @@ -232,6 +234,7 @@ type hostConnPool struct {
connPicker ConnPicker
closed bool
filling bool
debouncer *debounce.SimpleDebouncer

logger StdLogger
}
Expand All @@ -256,6 +259,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
filling: false,
closed: false,
logger: session.logger,
debouncer: debounce.NewSimpleDebouncer(),
}

// the pool is not filled or connected
Expand All @@ -274,7 +278,7 @@ func (pool *hostConnPool) Pick(token Token, qry ExecutableQuery) *Conn {
size, missing := pool.connPicker.Size()
if missing > 0 {
// try to fill the pool
go pool.fill()
go pool.fill_debounce()

if size == 0 {
return nil
Expand Down Expand Up @@ -385,6 +389,10 @@ func (pool *hostConnPool) fill() {
}()
}

func (pool *hostConnPool) fill_debounce() {
pool.debouncer.Debounce(pool.fill)
}

func (pool *hostConnPool) logConnectErr(err error) {
if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
// connection refused
Expand Down Expand Up @@ -551,5 +559,5 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
}

pool.connPicker.Remove(conn)
go pool.fill()
go pool.fill_debounce()
}
25 changes: 25 additions & 0 deletions debounce/simple_debouncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package debounce

// SimpleDebouncer debounce function call with simple logc:
// 1. If call is currently pending, function call should go through
// 2. If call is scheduled, but not pending, function call should be voided
type SimpleDebouncer struct {
channel chan struct{}
}

// NewDebouncer creates a new Debouncer with a buffered channel of size 1
func NewSimpleDebouncer() *SimpleDebouncer {
return &SimpleDebouncer{
channel: make(chan struct{}, 1),
}
}

// Debounce attempts to execute the function if the channel allows it
func (d *SimpleDebouncer) Debounce(fn func()) {
select {
case d.channel <- struct{}{}:
fn()
<-d.channel
default:
}
}
55 changes: 55 additions & 0 deletions debounce/simple_debouncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package debounce

import (
"runtime"
"sync/atomic"
"testing"
)

// TestDebouncer tests that the debouncer allows only one function to execute at a time
func TestSimpleDebouncer(t *testing.T) {
d := NewSimpleDebouncer()
var executions int32
startedCh := make(chan struct{}, 1)
doneCh := make(chan struct{}, 1)
// Function to increment executions
fn := func() {
<-startedCh // Simulate work
atomic.AddInt32(&executions, 1)
<-doneCh // Simulate work
}
t.Run("Case 1", func(t *testing.T) {
// Case 1: Normal single execution
startedCh <- struct{}{}
doneCh <- struct{}{}
d.Debounce(fn)
// We expect that the function has only executed once due to debouncing
if atomic.LoadInt32(&executions) != 1 {
t.Errorf("Expected function to be executed only once, but got %d executions", executions)
}
})
atomic.StoreInt32(&executions, 0)
t.Run("Case 2", func(t *testing.T) {
// Case 2: Debounce the function multiple times at row when body is started
go d.Debounce(fn)
startedCh <- struct{}{}
go d.Debounce(fn)
go d.Debounce(fn)
doneCh <- struct{}{}
startedCh <- struct{}{}
doneCh <- struct{}{}
waitTillChannelIsEmpty(doneCh)
// We expect that the function has only executed once due to debouncing
if atomic.LoadInt32(&executions) != 2 {
t.Errorf("Expected function to be executed twice, but got %d executions", executions)
}
})
}
func waitTillChannelIsEmpty(ch chan struct{}) {
for {
if len(ch) == 0 {
return
}
runtime.Gosched()
}
}
2 changes: 1 addition & 1 deletion scylla_shard_aware_port_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func triggerPoolsRefill(sess *Session) {
hosts := sess.ring.allHosts()
for _, host := range hosts {
hostPool, _ := sess.pool.getPool(host)
go hostPool.fill()
go hostPool.fill_debounce()
}
}

Expand Down
Loading