Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #5217 - Review RoundRobinConnectionPool #5219

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
private boolean maximizeConnections;

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand Down Expand Up @@ -151,11 +152,28 @@ public boolean isEmpty()
}

@Override
@ManagedAttribute("Whether this pool is closed")
public boolean isClosed()
{
return pool.isClosed();
}

@ManagedAttribute("Whether the pool tries to maximize the number of connections used")
public boolean isMaximizeConnections()
{
return maximizeConnections;
}

/**
* <p>Sets whether the number of connections should be maximized.</p>
*
* @param maximizeConnections whether the number of connections should be maximized
*/
public void setMaximizeConnections(boolean maximizeConnections)
{
this.maximizeConnections = maximizeConnections;
}

@Override
public Connection acquire()
{
Expand All @@ -164,7 +182,8 @@ public Connection acquire()

/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
Expand All @@ -178,7 +197,7 @@ protected Connection acquire(boolean create)
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && create)
if (connection == null && (create || isMaximizeConnections()))
{
tryCreate(destination.getQueuedRequestCount());
connection = activate();
Expand Down Expand Up @@ -357,8 +376,8 @@ protected void removed(Connection connection)
}

/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable queue working as a view of the idle connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Queue<Connection> getIdleConnections()
Expand All @@ -371,8 +390,8 @@ public Queue<Connection> getIdleConnections()
}

/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable collection working as a view of the active connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Collection<Connection> getActiveConnections()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

/**
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
* <p>The algorithm that decides the index value is decided by subclasses.</p>
* <p>To acquire a connection, this class obtains the index value and attempts
* to activate the pool entry at that index.
* If this activation fails, another attempt to activate an alternative pool
* entry is performed, to avoid stalling connection acquisition if there is
* an available entry at a different index.</p>
*/
@ManagedObject
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class);

private final Pool<Connection> pool;

public IndexedConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cache should be forced to false, otherwise you risk reaching a steady state where one thread continuously uses the connection at the same index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there shouldn't be a cache and it should be replaced by round robin: see #5218

{
super(destination, maxConnections, cache, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

/**
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
* used to attempt to acquire the connection at that index in the pool.</p>
*
* @param maxConnections the upper bound of the index (exclusive)
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
*/
protected abstract int getIndex(int maxConnections);

@Override
protected Connection activate()
{
int index = getIndex(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activating at index={} entry={}", index, entry);
if (entry == null)
{
entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activating alternative entry={}", entry);
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
* <p>An indexed {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester, maxMultiplex);
}

@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;

/**
* <p>A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.</p>
* <p>The round-robin behavior is almost impossible to achieve for several reasons:</p>
* <ul>
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
* <p>Do not expect this class to provide connections in a perfect recurring sequence such as
* {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.</p>
*
* @see RandomConnectionPool
*/
@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
public class RoundRobinConnectionPool extends IndexedConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);

private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;
private final AtomicInteger offset = new AtomicInteger();

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
Expand All @@ -43,36 +55,16 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

@Override
protected Connection acquire(boolean create)
{
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
setMaximizeConnections(true);
}

@Override
protected Connection activate()
protected int getIndex(int maxConnections)
{
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
return Math.abs(offset.getAndIncrement() % maxConnections);
sbordet marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), true, destination, 1))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -194,9 +195,9 @@ public void testMultiplexWithMaxUsage(Transport transport) throws Exception
multiplex = 2;
int maxMultiplex = multiplex;

int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;
int maxUsage = 3;
int maxConnections = 4;
int count = 2 * maxConnections * maxMultiplex * maxUsage;

List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
Expand Down Expand Up @@ -229,9 +230,14 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());

// Maps {remote_port -> number_of_times_port_was_used}.
Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
// RoundRobinConnectionPool may open more connections than expected.
// For example with maxUsage=2, requests could be sent to these ports:
// [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7]
// Opening p5 and p6 was delayed, so the opening of p7 was triggered
// to replace p4 while p5 and p6 were busy sending their requests.
assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size()));
}
}