Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make heavy loaded optimization configurable #2 #211

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gocql
import (
"context"
"errors"
"fmt"
"net"
"time"
)
Expand Down Expand Up @@ -93,6 +94,24 @@ type ClusterConfig struct {
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Threshold for the number of inflight requests per connection
// after which the connection is considered as heavy loaded
// Default: 512
HeavyLoadedConnectionThreshold int

// When a connection is considered as heavy loaded, the driver
// could switch to the least loaded connection for the same node.
// The switch will happen if the other connection is at least
// HeavyLoadedSwitchConnectionPercentage percentage less busy
// (in terms of inflight requests).
//
// For the default value of 20%, if the heavy loaded connection
// has 100 inflight requests, the switch will happen only if the
// least busy connection has less than 80 inflight requests.
//
// Default: 20%
HeavyLoadedSwitchConnectionPercentage int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down Expand Up @@ -291,27 +310,29 @@ type Dialer interface {
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
InitialReconnectionPolicy: &NoReconnectionPolicy{},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
InitialReconnectionPolicy: &NoReconnectionPolicy{},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
HeavyLoadedConnectionThreshold: 512,
HeavyLoadedSwitchConnectionPercentage: 20,
}

return cfg
Expand Down Expand Up @@ -374,6 +395,14 @@ func (cfg *ClusterConfig) Validate() error {
return errors.New("ReconnectionPolicy.GetMaxRetries returns non-positive number")
}

if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 {
return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage)
}

if cfg.HeavyLoadedConnectionThreshold < 0 {
return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold)
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int {
return c.streams.Available()
}

func (c *Conn) StreamsInUse() int {
return c.streams.InUse()
}

func (c *Conn) UseKeyspace(keyspace string) error {
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
q.params.consistency = c.session.cons
Expand Down
5 changes: 5 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,8 @@ func (s *IDGenerator) Available() int {
func (s *IDGenerator) InUse() int {
return int(atomic.LoadInt32(&s.inuseStreams))
}

// SetStreamsInUse sets streams in use counter, to be used for testing only
func SetStreamsInUse(s *IDGenerator, val int32) {
atomic.StoreInt32(&s.inuseStreams, val)
}
7 changes: 3 additions & 4 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn {
return c
}
alternative := p.leastBusyConn()
if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 {
return c
} else {
if alternative != nil && alternative.StreamsInUse()*100 <= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) {
return alternative
}
return c
}

func isHeavyLoaded(c *Conn) bool {
return c.streams.NumStreams/2 > c.AvailableStreams()
return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold
Comment on lines -427 to +426

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part was already configurable trough MaxRequestsPerConn parameter in ClusterConfig.
I think we should either stick with the old parameter or remove it. Having 2 will be confusing.
Also, currently this PR changes the default (previously it was 32768 / 2, now it is 512). Why this number and why should the default be changed?

Copy link
Collaborator Author

@dkropachev dkropachev Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having it in an old way do not allow user to switch early and to have lot's of requests in flight at the same time.
MaxRequestsPerConn used to control not only max number of requests in flight, but also a waterline after which connection is considered under high load.
This kind of coupling is never good.
Beside that, before this PR driver considered connection to be highly loaded only after reaching 16k requests in flight, which is a HUGE number, it is safe to say that this feature never worked before.

Having a separate configuration for a heavy loaded water line allows user to allow driver to have big number of in flight requests and at the same time to switch to underutilized connections early.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having it in an old way do not allow user to switch early and to have lot's of requests in flight at the same time. MaxRequestsPerConn used to control not only max number of requests in flight, but also a waterline after which connection is considered under high load. This kind of coupling is never good.
Beside that, before this PR driver considered connection to be highly loaded only after reaching 16k requests in flight, which is a HUGE number, it is safe to say that this feature never worked before.

This number was configurable by the user, so its not really safe to say this.

Having a separate configuration for a heavy loaded water line allows user to allow driver to have big number of in flight requests and at the same time to switch to underutilized connections early.

If switching out early is the goal (which also means disabling shard awareness early!!) then I agree that HeavyLoadedConnectionThreshold makes sense.

}

func (p *scyllaConnPicker) leastBusyConn() *Conn {
Expand Down
71 changes: 71 additions & 0 deletions scylla_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,76 @@ func TestScyllaConnPickerHammerPickNilToken(t *testing.T) {
wg.Wait()
}

func TestScyllaConnPicker(t *testing.T) {
t.Parallel()

t.Run("maybeReplaceWithLessBusyConnection", func(t *testing.T) {

cfg := ClusterConfig{
HeavyLoadedSwitchConnectionPercentage: 30,
HeavyLoadedConnectionThreshold: 100,
}

tcases := []struct {
name string
streamsInUse [3]int32
expected int
}{
{
name: "all connections below threshold",
streamsInUse: [3]int32{99, 98, 97},
expected: 0,
},
{
name: "all connections in threshold, but none is switchable",
streamsInUse: [3]int32{110, 109, 108},
expected: 0,
},
{
name: "all connections in threshold, one is below threshold",
streamsInUse: [3]int32{110, 109, 70},
expected: 2,
},
{
name: "all connections in threshold, one is above threshold, but below switchable percentage",
streamsInUse: [3]int32{210, 130, 209},
expected: 1,
},
}

for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
s := scyllaConnPicker{
nrShards: 4,
msbIgnore: 12,
}

conns := [3]*Conn{
mockConn(0),
mockConn(1),
mockConn(2),
}

for _, conn := range conns {
conn.session.cfg = cfg
s.Put(conn)
}

for id, inUse := range tcase.streamsInUse {
streams.SetStreamsInUse(conns[id].streams, inUse)
}

expectedConn := conns[tcase.expected]

c := s.maybeReplaceWithLessBusyConnection(conns[0])
if c != expectedConn {
t.Errorf("expected connection from shard %d, got %d", expectedConn.scyllaSupported.shard, c.scyllaSupported.shard)
}
})
}
})
}

func TestScyllaConnPickerRemove(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -135,6 +205,7 @@ func mockConn(shard int) *Conn {
partitioner: "org.apache.cassandra.dht.Murmur3Partitioner",
shardingAlgorithm: "biased-token-round-robin",
},
session: &Session{},
}
}

Expand Down
3 changes: 1 addition & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf
// NewSession wraps an existing Node.
func NewSession(cfg ClusterConfig) (*Session, error) {
if err := cfg.Validate(); err != nil {
return nil, err
return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err)
}

// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down
Loading