Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Least connected load balancing strategy #385

Merged
merged 6 commits into from
Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.cluster.LoadBalancer;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.SocketConnector;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
Expand Down Expand Up @@ -146,7 +149,21 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings )
{
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
createLoadBalancingStrategy( config, connectionPool ) );
}

private LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool )
{
switch ( config.loadBalancingStrategy() )
{
case ROUND_ROBIN:
return new RoundRobinLoadBalancingStrategy();
case LEAST_CONNECTED:
return new LeastConnectedLoadBalancingStrategy( connectionPool );
default:
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,23 @@

import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.net.BoltServerAddress;

public class RoundRobinAddressSet
public class AddressSet
{
private static final BoltServerAddress[] NONE = {};
private final AtomicInteger offset = new AtomicInteger();
private volatile BoltServerAddress[] addresses = NONE;

public int size()
{
return addresses.length;
}
private volatile BoltServerAddress[] addresses = NONE;

public BoltServerAddress next()
public BoltServerAddress[] toArray()
{
BoltServerAddress[] addresses = this.addresses;
if ( addresses.length == 0 )
{
return null;
}
return addresses[next( addresses.length )];
return addresses;
}

int next( int divisor )
public int size()
{
int index = offset.getAndIncrement();
for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() )
{
offset.compareAndSet( Integer.MIN_VALUE, index % divisor );
}
return index % divisor;
return addresses.length;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
Expand Down Expand Up @@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address )
@Override
public String toString()
{
return "RoundRobinAddressSet=" + Arrays.toString( addresses );
}

/** breaking encapsulation in order to perform white-box testing of boundary case */
void setOffset( int target )
{
offset.set( target );
return "AddressSet=" + Arrays.toString( addresses );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.util.Function;

final class ClusterComposition
public final class ClusterComposition
{
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
Expand All @@ -53,7 +53,7 @@ private ClusterComposition( long expirationTimestamp )
}

/** For testing */
ClusterComposition(
public ClusterComposition(
long expirationTimestamp,
Set<BoltServerAddress> readers,
Set<BoltServerAddress> writers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable

private final Clock clock;
private volatile long expirationTimeout;
private final RoundRobinAddressSet readers;
private final RoundRobinAddressSet writers;
private final RoundRobinAddressSet routers;
private final AddressSet readers;
private final AddressSet writers;
private final AddressSet routers;

public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
Expand All @@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock )
this.clock = clock;
this.expirationTimeout = clock.millis() - 1;

this.readers = new RoundRobinAddressSet();
this.writers = new RoundRobinAddressSet();
this.routers = new RoundRobinAddressSet();
this.readers = new AddressSet();
this.writers = new AddressSet();
this.routers = new AddressSet();
}

@Override
Expand Down Expand Up @@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address )
}

@Override
public RoundRobinAddressSet readers()
public AddressSet readers()
{
return readers;
}

@Override
public RoundRobinAddressSet writers()
public AddressSet writers()
{
return writers;
}

@Override
public BoltServerAddress nextRouter()
public AddressSet routers()
{
return routers.next();
}

@Override
public int routerSize()
{
return routers.size();
return routers;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
Set<BoltServerAddress> seenServers )
{
int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
{
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
break;
}
BoltServerAddress[] addresses = routingTable.routers().toArray();

for ( BoltServerAddress address : addresses )
{
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
if ( composition != null )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@

import static java.lang.String.format;

class RoutingPooledConnection implements PooledConnection
public class RoutingPooledConnection implements PooledConnection
{
private final PooledConnection delegate;
private final RoutingErrorHandler errorHandler;
private final AccessMode accessMode;

RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode )
public RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode )
{
this.delegate = delegate;
this.errorHandler = errorHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ public interface RoutingTable

void forget( BoltServerAddress address );

RoundRobinAddressSet readers();
AddressSet readers();

RoundRobinAddressSet writers();
AddressSet writers();

BoltServerAddress nextRouter();

int routerSize();
AddressSet routers();

void removeWriter( BoltServerAddress toRemove );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster.loadbalancing;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;

/**
* Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from
* given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent
* choosing same first address over and over when all addresses have same amount of active connections.
*/
public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy
{
private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();

private final ConnectionPool connectionPool;

public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool )
{
this.connectionPool = connectionPool;
}

@Override
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
{
return select( knownReaders, readersIndex );
}

@Override
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
{
return select( knownWriters, writersIndex );
}

private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex )
{
int size = addresses.length;
if ( size == 0 )
{
return null;
}

// choose start index for iteration in round-rodin fashion
int startIndex = addressesIndex.next( size );
int index = startIndex;

BoltServerAddress leastConnectedAddress = null;
int leastActiveConnections = Integer.MAX_VALUE;

// iterate over the array to find least connected address
do
{
BoltServerAddress address = addresses[index];
int activeConnections = connectionPool.activeConnections( address );

if ( activeConnections < leastActiveConnections )
{
leastConnectedAddress = address;
leastActiveConnections = activeConnections;
}

// loop over to the start of the array when end is reached
if ( index == size - 1 )
{
index = 0;
}
else
{
index++;
}
}
while ( index != startIndex );

return leastConnectedAddress;
}
}
Loading