Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5147 - HTTP2 RoundRobinConnectionPool with maxUsage #5187

Merged
merged 1 commit into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,14 @@ public void release(Connection connection)
{
// Trigger the next request after releasing the connection.
if (connectionPool.release(connection))
{
send(false);
}
else
{
connection.close();
send(true);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@

package org.eclipse.jetty.client;

import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;

@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);

private final AtomicInteger offset = new AtomicInteger();
private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
Expand All @@ -47,26 +47,32 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
}

@Override
protected Connection activate()
protected Connection acquire(boolean create)
{
int offset = this.offset.get();
Connection connection = activate(offset);
if (connection != null)
this.offset.getAndIncrement();
return connection;
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
}

private Connection activate(int offset)
@Override
protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
if (LOG.isDebugEnabled())
LOG.debug("activated '{}'", entry);
if (entry != null)
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
Connection connection = entry.getPooled();
acquired(connection);
return connection;
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
return null;
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -99,12 +98,9 @@ public void send(HttpExchange exchange)
@Override
public void release()
{
setStream(null);
connection.release(this);
}

void onStreamClosed(IStream stream)
{
connection.onStreamClosed(stream, this);
getHttpDestination().release(getHttpConnection());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -119,16 +118,6 @@ else if (isRecycleHttpChannels())
}
}

void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed for {}", stream, channel);
channel.setStream(null);
// Only non-push channels are released.
if (stream.isLocal())
getHttpDestination().release(this);
}

@Override
public boolean onIdleTimeout(long idleTimeout)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
Expand Down Expand Up @@ -201,12 +200,6 @@ public void onFailure(Stream stream, int error, String reason, Throwable failure
callback.succeeded();
}

@Override
public void onClosed(Stream stream)
{
getHttpChannel().onStreamClosed((IStream)stream);
}

private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand Down Expand Up @@ -180,4 +182,56 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testMultiplexWithMaxUsage(Transport transport) throws Exception
{
init(transport);

int multiplex = 1;
if (scenario.transport.isHttp2Based())
multiplex = 2;
int maxMultiplex = multiplex;

int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;

List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
remotePorts.add(request.getRemotePort());
}
});
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex);
pool.setMaxUsageCount(maxUsage);
return pool;
});

CountDownLatch clientLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
scenario.client.newRequest(scenario.newURI())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
clientLatch.countDown();
});
}
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());

Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
}
}