Skip to content

Commit

Permalink
add max duration property
Browse files Browse the repository at this point in the history
  • Loading branch information
lorban committed Dec 17, 2020
1 parent 797c06b commit 5b8246f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -48,8 +50,25 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen

private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
private final Pool<ConnectionHolder> pool;
private boolean maximizeConnections;
private volatile long maxDurationNanos = 0L;

public static class ConnectionHolder
{
private final Connection connection;
private final long creationTimestamp = System.nanoTime();

ConnectionHolder(Connection connection)
{
this.connection = connection;
}

private boolean isExpired(long timeoutNanos)
{
return System.nanoTime() - creationTimestamp >= timeoutNanos;
}
}

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand All @@ -65,7 +84,7 @@ protected AbstractConnectionPool(HttpDestination destination, int maxConnections
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
protected AbstractConnectionPool(HttpDestination destination, Pool<ConnectionHolder> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
Expand All @@ -90,6 +109,22 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
return CompletableFuture.allOf(futures);
}

@ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed")
public long getMaxDuration()
{
return TimeUnit.NANOSECONDS.toMillis(maxDurationNanos);
}

/**
* Set the max usage duration of the pool's entries in milliseconds.
* Values {@code 0} and negative mean that there is no limit.
* @param maxDuration the maximum duration in milliseconds.
*/
public void setMaxDuration(long maxDuration)
{
this.maxDurationNanos = TimeUnit.MILLISECONDS.toNanos(maxDuration);
}

protected int getMaxMultiplex()
{
return pool.getMaxMultiplex();
Expand Down Expand Up @@ -232,7 +267,7 @@ private CompletableFuture<Void> tryCreateAsync(int maxPending)
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);

Pool<Connection>.Entry entry = pool.reserve(maxPending);
Pool<ConnectionHolder>.Entry entry = pool.reserve(maxPending);
if (entry == null)
return CompletableFuture.completedFuture(null);

Expand All @@ -254,7 +289,7 @@ public void succeeded(Connection connection)
}
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
entry.enable(new ConnectionHolder(connection), false);
idle(connection, false);
future.complete(null);
proceed();
Expand All @@ -281,16 +316,25 @@ protected void proceed()

protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquire();
if (entry != null)
while (true)
{
Pool<ConnectionHolder>.Entry entry = pool.acquire();
if (entry == null)
return null;
Connection connection = entry.getPooled().connection;

long maxDurationNanos = this.maxDurationNanos;
if (maxDurationNanos > 0L && entry.getPooled().isExpired(maxDurationNanos))
{
IO.close(connection);
continue;
}

if (LOG.isDebugEnabled())
LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
return null;
}

@Override
Expand All @@ -300,7 +344,7 @@ public boolean isActive(Connection connection)
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
Pool<ConnectionHolder>.Entry entry = (Pool<ConnectionHolder>.Entry)attachable.getAttachment();
if (entry == null)
return false;
return !entry.isIdle();
Expand All @@ -321,7 +365,7 @@ protected boolean deactivate(Connection connection)
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
Pool<ConnectionHolder>.Entry entry = (Pool<ConnectionHolder>.Entry)attachable.getAttachment();
if (entry == null)
return true;
boolean reusable = pool.release(entry);
Expand All @@ -345,11 +389,12 @@ protected boolean remove(Connection connection, boolean force)
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
Pool<ConnectionHolder>.Entry entry = (Pool<ConnectionHolder>.Entry)attachable.getAttachment();
if (entry == null)
return false;
attachable.setAttachment(null);
boolean removed = pool.remove(entry);
if (removed)
attachable.setAttachment(null);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
if (removed || force)
Expand Down Expand Up @@ -391,7 +436,7 @@ public Queue<Connection> getIdleConnections()
return pool.values().stream()
.filter(Pool.Entry::isIdle)
.filter(entry -> !entry.isClosed())
.map(Pool.Entry::getPooled)
.map(entry -> entry.getPooled().connection)
.collect(toCollection(ArrayDeque::new));
}

Expand All @@ -405,7 +450,7 @@ public Collection<Connection> getActiveConnections()
return pool.values().stream()
.filter(entry -> !entry.isIdle())
.filter(entry -> !entry.isClosed())
.map(Pool.Entry::getPooled)
.map(entry -> entry.getPooled().connection)
.collect(Collectors.toList());
}

Expand All @@ -426,12 +471,12 @@ public boolean sweep()
{
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
{
Connection connection = entry.getPooled();
if (((Sweeper.Sweepable)connection).sweep())
ConnectionHolder holder = entry.getPooled();
if (((Sweeper.Sweepable)holder.connection).sweep())
{
boolean removed = remove(connection);
boolean removed = remove(holder.connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
holder.connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -37,7 +36,7 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, boo
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}

public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
public DuplexConnectionPool(HttpDestination destination, Pool<ConnectionHolder> pool, Callback requester)
{
super(destination, pool, requester);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -37,7 +36,7 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
public MultiplexConnectionPool(HttpDestination destination, Pool<ConnectionHolder> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
setMaxMultiplex(maxMultiplex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
pool.setMaxDuration(10);
return pool;
}),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
Expand Down

0 comments on commit 5b8246f

Please sign in to comment.