Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #5217 - Review RoundRobinConnectionPool #5219

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
private boolean maximizeConnections;

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand Down Expand Up @@ -151,11 +152,28 @@ public boolean isEmpty()
}

@Override
@ManagedAttribute("Whether this pool is closed")
public boolean isClosed()
{
return pool.isClosed();
}

@ManagedAttribute("Whether the pool tries to maximize the number of connections used")
public boolean isMaximizeConnections()
{
return maximizeConnections;
}

/**
* <p>Sets whether the number of connections should be maximized.</p>
*
* @param maximizeConnections whether the number of connections should be maximized
*/
public void setMaximizeConnections(boolean maximizeConnections)
{
this.maximizeConnections = maximizeConnections;
}

@Override
public Connection acquire()
{
Expand All @@ -164,7 +182,8 @@ public Connection acquire()

/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
Expand All @@ -178,7 +197,7 @@ protected Connection acquire(boolean create)
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && create)
if (connection == null && (create || isMaximizeConnections()))
{
tryCreate(destination.getQueuedRequestCount());
connection = activate();
Expand Down Expand Up @@ -357,8 +376,8 @@ protected void removed(Connection connection)
}

/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable queue working as a view of the idle connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Queue<Connection> getIdleConnections()
Expand All @@ -371,8 +390,8 @@ public Queue<Connection> getIdleConnections()
}

/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable collection working as a view of the active connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Collection<Connection> getActiveConnections()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

/**
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
* <p>The algorithm that decides the index value is decided by subclasses.</p>
* <p>To acquire a connection, this class obtains the index value and attempts
* to activate the pool entry at that index.
* If this activation fails, another attempt to activate an alternative pool
* entry is performed, to avoid stalling connection acquisition if there is
* an available entry at a different index.</p>
*/
@ManagedObject
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class);

private final Pool<Connection> pool;

public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

/**
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
* used to attempt to acquire the connection at that index in the pool.</p>
*
* @param maxConnections the upper bound of the index (exclusive)
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
*/
protected abstract int getIndex(int maxConnections);

@Override
protected Connection activate()
{
int index = getIndex(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activating at index={} entry={}", index, entry);
if (entry == null)
{
entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activating alternative entry={}", entry);
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
* <p>An indexed {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
}

@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,39 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;

/**
* <p>A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.</p>
* <p>The round-robin behavior is almost impossible to achieve for several reasons:</p>
* <ul>
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
* <p>Do not expect this class to provide connections in a perfect recurring sequence such as
* {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.</p>
* <p>Applications using this class should {@link #preCreateConnections(int) pre-create}
* the connections to ensure that they are already opened when the application starts to requests
* them, otherwise the first connection that is opened may be used multiple times before the others
* are opened, resulting in a behavior that is more random-like than more round-robin-like (and
* that confirms that round-robin behavior is almost impossible to achieve).</p>
*
* @see RandomConnectionPool
*/
@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
public class RoundRobinConnectionPool extends IndexedConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);

private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;
private final AtomicInteger offset = new AtomicInteger();

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
Expand All @@ -42,37 +59,17 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

@Override
protected Connection acquire(boolean create)
{
super(destination, maxConnections, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
setMaximizeConnections(true);
}

@Override
protected Connection activate()
protected int getIndex(int maxConnections)
{
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}

Expand Down
Loading