Skip to content

Commit

Permalink
add max duration property
Browse files Browse the repository at this point in the history
  • Loading branch information
lorban committed Dec 11, 2020
1 parent 74bcdbe commit 44bc3ef
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -48,8 +49,26 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen

private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
private final Pool<ConnectionHolder> pool;
private boolean maximizeConnections;
private volatile long maxDuration = 0L;

public static class ConnectionHolder
{
private final Connection connection;
private final long creationTimestamp;

ConnectionHolder(Connection connection)
{
this.connection = connection;
this.creationTimestamp = System.nanoTime();
}

boolean isExpired(long timeoutNanos)
{
return System.nanoTime() - creationTimestamp >= timeoutNanos;
}
}

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand All @@ -65,7 +84,7 @@ protected AbstractConnectionPool(HttpDestination destination, int maxConnections
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
protected AbstractConnectionPool(HttpDestination destination, Pool<ConnectionHolder> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
Expand All @@ -90,6 +109,17 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
return CompletableFuture.allOf(futures);
}

@ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed")
public long getMaxDuration()
{
return maxDuration;
}

public void setMaxDuration(long maxDuration)
{
this.maxDuration = maxDuration;
}

protected int getMaxMultiplex()
{
return pool.getMaxMultiplex();
Expand Down Expand Up @@ -232,7 +262,7 @@ private CompletableFuture<Void> tryCreateAsync(int maxPending)
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);

Pool<Connection>.Entry entry = pool.reserve(maxPending);
Pool<ConnectionHolder>.Entry entry = pool.reserve(maxPending);
if (entry == null)
return CompletableFuture.completedFuture(null);

Expand All @@ -254,7 +284,7 @@ public void succeeded(Connection connection)
}
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
entry.enable(new ConnectionHolder(connection), false);
idle(connection, false);
future.complete(null);
proceed();
Expand All @@ -281,16 +311,25 @@ protected void proceed()

protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquire();
if (entry != null)
while (true)
{
Pool<ConnectionHolder>.Entry entry = pool.acquire();
if (entry == null)
return null;
Connection connection = entry.getPooled().connection;

if (maxDuration > 0L && entry.getPooled().isExpired(maxDuration))
{
if (remove(connection))
IO.close(connection);
continue;
}

if (LOG.isDebugEnabled())
LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
return null;
}

@Override
Expand All @@ -300,7 +339,7 @@ public boolean isActive(Connection connection)
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
Pool<ConnectionHolder>.Entry entry = (Pool<ConnectionHolder>.Entry)attachable.getAttachment();
if (entry == null)
return false;
return !entry.isIdle();
Expand All @@ -321,7 +360,7 @@ protected boolean deactivate(Connection connection)
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
Pool<ConnectionHolder>.Entry entry = (Pool<ConnectionHolder>.Entry)attachable.getAttachment();
if (entry == null)
return true;
boolean reusable = pool.release(entry);
Expand All @@ -345,11 +384,12 @@ protected boolean remove(Connection connection, boolean force)
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
Pool<ConnectionHolder>.Entry entry = (Pool<ConnectionHolder>.Entry)attachable.getAttachment();
if (entry == null)
return false;
attachable.setAttachment(null);
boolean removed = pool.remove(entry);
if (removed)
attachable.setAttachment(null);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
if (removed || force)
Expand Down Expand Up @@ -391,7 +431,7 @@ public Queue<Connection> getIdleConnections()
return pool.values().stream()
.filter(Pool.Entry::isIdle)
.filter(entry -> !entry.isClosed())
.map(Pool.Entry::getPooled)
.map(entry -> entry.getPooled().connection)
.collect(toCollection(ArrayDeque::new));
}

Expand All @@ -405,7 +445,7 @@ public Collection<Connection> getActiveConnections()
return pool.values().stream()
.filter(entry -> !entry.isIdle())
.filter(entry -> !entry.isClosed())
.map(Pool.Entry::getPooled)
.map(entry -> entry.getPooled().connection)
.collect(Collectors.toList());
}

Expand All @@ -426,12 +466,12 @@ public boolean sweep()
{
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
{
Connection connection = entry.getPooled();
if (((Sweeper.Sweepable)connection).sweep())
ConnectionHolder holder = entry.getPooled();
if (((Sweeper.Sweepable)holder.connection).sweep())
{
boolean removed = remove(connection);
boolean removed = remove(holder.connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
holder.connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -37,7 +36,7 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, boo
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}

public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
public DuplexConnectionPool(HttpDestination destination, Pool<ConnectionHolder> pool, Callback requester)
{
super(destination, pool, requester);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand All @@ -37,7 +36,7 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
public MultiplexConnectionPool(HttpDestination destination, Pool<ConnectionHolder> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
setMaxMultiplex(maxMultiplex);
Expand Down

0 comments on commit 44bc3ef

Please sign in to comment.