Skip to content

Commit

Permalink
Issue #1919 scalable scheduler
Browse files Browse the repository at this point in the history
Used the CyclicTimeoutTask class in the clients HttpDestination and HttpConnection
as an alternative to direct scheduler usage.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Nov 2, 2017
1 parent 9aa694f commit 5408218
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.eclipse.jetty.client;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.net.CookieStore;
import java.net.HttpCookie;
import java.net.URI;
Expand All @@ -43,13 +45,15 @@ public abstract class HttpConnection implements Connection
private static final Logger LOG = Log.getLogger(HttpConnection.class);

private final HttpDestination destination;
private final TimeoutCompleteListener timeout;
private int idleTimeoutGuard;
private long idleTimeoutStamp;

protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
this.idleTimeoutStamp = System.nanoTime();
this.timeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}

public HttpClient getHttpClient()
Expand All @@ -68,12 +72,8 @@ public void send(Request request, Response.CompleteListener listener)
HttpRequest httpRequest = (HttpRequest)request;

ArrayList<Response.ResponseListener> listeners = new ArrayList<>(httpRequest.getResponseListeners());
if (httpRequest.getTimeout() > 0)
{
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(httpRequest);
timeoutListener.schedule(getHttpClient().getScheduler());
listeners.add(timeoutListener);
}

httpRequest.sent();
if (listener != null)
listeners.add(listener);

Expand Down Expand Up @@ -179,6 +179,14 @@ private void applyAuthentication(Request request, URI uri)

protected SendFailure send(HttpChannel channel, HttpExchange exchange)
{
long nanoTime = System.nanoTime();
long timeoutInMs = exchange.getRequest().timeoutIn(nanoTime,MILLISECONDS);
if (timeoutInMs>=0)
{
exchange.getResponseListeners().add(timeout);
timeout.schedule(exchange.getRequest());
}

// Forbid idle timeouts for the time window where
// the request is associated to the channel and sent.
// Use a counter to support multiplexed requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.eclipse.jetty.client;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
Expand All @@ -26,6 +29,8 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
Expand All @@ -34,6 +39,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.CyclicTimeoutTask;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
Expand All @@ -45,6 +51,7 @@
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.Sweeper;

@ManagedObject
Expand All @@ -60,6 +67,8 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
private final ProxyConfiguration.Proxy proxy;
private final ClientConnectionFactory connectionFactory;
private final HttpField hostField;
private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE);
private final CyclicTimeoutTask timeout;
private ConnectionPool connectionPool;

public HttpDestination(HttpClient client, Origin origin)
Expand All @@ -71,6 +80,8 @@ public HttpDestination(HttpClient client, Origin origin)

this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier();

this.timeout = new TimeoutTask(client.getScheduler());

ProxyConfiguration proxyConfig = client.getProxyConfiguration();
proxy = proxyConfig.match(origin);
Expand Down Expand Up @@ -228,7 +239,7 @@ public void failed(Throwable x)
}

protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
{
{
if (!getScheme().equalsIgnoreCase(request.getScheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
if (!getHost().equalsIgnoreCase(request.getHost()))
Expand All @@ -243,6 +254,13 @@ protected void send(HttpRequest request, List<Response.ResponseListener> listene
{
if (enqueue(exchanges, exchange))
{
long nanoTime = System.nanoTime();
long expiresInMs = request.timeoutIn(nanoTime,MILLISECONDS);
if (expiresInMs==0)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresInMs>0)
scheduleTimeout(nanoTime,expiresInMs);

if (!client.isRunning() && exchanges.remove(exchange))
{
request.abort(new RejectedExecutionException(client + " is stopping"));
Expand All @@ -268,6 +286,20 @@ protected void send(HttpRequest request, List<Response.ResponseListener> listene
}
}

private void scheduleTimeout(long nanoTime, long expiresInMs)
{
// Schedule a timeout for the soonest any known exchange can expire.
// If subsequently that exchange is removed from the queue, the timeout is not
// cancelled, instead the entire queue is swept for expired exchanges and a new
// timeout is set.
long expiresAtMs = NANOSECONDS.toMillis(nanoTime) + expiresInMs;
long lastExpiresAtMs = nextTimeout.getAndUpdate(e->Math.min(e,expiresAtMs));
if (lastExpiresAtMs==Long.MAX_VALUE)
timeout.schedule(expiresInMs,MILLISECONDS);
else if (lastExpiresAtMs!=expiresAtMs)
timeout.reschedule(expiresInMs,MILLISECONDS);
}

protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
{
return queue.offer(exchange);
Expand Down Expand Up @@ -470,4 +502,37 @@ public String toString()
exchanges.size(),
connectionPool);
}

// The TimeoutTask that expires when the next check of expiry is needed
private class TimeoutTask extends CyclicTimeoutTask
{
public TimeoutTask(Scheduler scheduler)
{
super(scheduler);
}

@Override
protected void onTimeoutExpired()
{
nextTimeout.set(Long.MAX_VALUE);
long nanoTime = System.nanoTime();
long nextExpiresInMs = Long.MAX_VALUE;

// Check all queued exchanges for those that have expired
// and to determine when the next check must be.
for (HttpExchange exchange : exchanges)
{
long expiresInMs = exchange.getRequest().timeoutIn(nanoTime,MILLISECONDS);
if (expiresInMs==0)
{
exchange.getRequest().abort(new TimeoutException("Total timeout " + exchange.getRequest().getTimeout() + " ms elapsed"));
}
else if (expiresInMs>0 && expiresInMs<nextExpiresInMs)
nextExpiresInMs = expiresInMs;
}

if (nextExpiresInMs<Long.MAX_VALUE && client.isRunning())
scheduleTimeout(nanoTime,nextExpiresInMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class HttpRequest implements Request
private HttpVersion version = HttpVersion.HTTP_1_1;
private long idleTimeout;
private long timeout;
private long sentTimestampNanos;
private ContentProvider content;
private boolean followRedirects;
private List<HttpCookie> cookies;
Expand Down Expand Up @@ -695,34 +696,42 @@ public ContentResponse send() throws InterruptedException, TimeoutException, Exe
@Override
public void send(Response.CompleteListener listener)
{
TimeoutCompleteListener timeoutListener = null;
try
{
if (getTimeout() > 0)
{
timeoutListener = new TimeoutCompleteListener(this);
timeoutListener.schedule(client.getScheduler());
responseListeners.add(timeoutListener);
}
send(this, listener);
}
catch (Throwable x)
{
// Do not leak the scheduler task if we
// can't even start sending the request.
if (timeoutListener != null)
timeoutListener.cancel();
throw x;
}
send(this, listener);
}

void sent()
{
sentTimestampNanos = System.nanoTime();
}

private void send(HttpRequest request, Response.CompleteListener listener)
{
if (listener != null)
responseListeners.add(listener);
sent();
client.send(request, responseListeners);
}

/**
* Get the time until the current request timeout fires
* @param nanotime The current nanotime
* @param units The units of the time to return
* @return The time until the current request timeout fires;
* or 0 if the timeout has expired; or -1 if no timeout applies
*/
public long timeoutIn(long nanotime, TimeUnit units)
{
long timeoutMs = getTimeout();
if (timeoutMs<=0 || sentTimestampNanos==0)
return -1;
long now = TimeUnit.NANOSECONDS.toMillis(nanotime);
long expires = TimeUnit.NANOSECONDS.toMillis(sentTimestampNanos)+timeoutMs;
if (expires<=now)
return 0;

return TimeUnit.MILLISECONDS.convert(expires-now,units);
}

protected List<Response.ResponseListener> getResponseListeners()
{
return responseListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,65 @@

package org.eclipse.jetty.client;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeoutTask;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;

public class TimeoutCompleteListener implements Response.CompleteListener, Runnable
public class TimeoutCompleteListener extends CyclicTimeoutTask implements Response.CompleteListener
{
private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class);

private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
private final Request request;
private final AtomicReference<Request> request = new AtomicReference<Request>();

public TimeoutCompleteListener(Request request)
public TimeoutCompleteListener(Scheduler scheduler)
{
this.request = request;
super(scheduler);
}

@Override
public void onComplete(Result result)
protected void onTimeoutExpired()
{
cancel();
Request request = this.request.getAndSet(null);
if (LOG.isDebugEnabled())
LOG.debug("onTimeoutExpired for {}", request);
if (request!=null)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}

public boolean schedule(Scheduler scheduler)

@Override
public void onComplete(Result result)
{
long timeout = request.getTimeout();
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
Scheduler.Task existing = this.task.getAndSet(task);
if (existing != null)
Request request = this.request.getAndSet(null);
if (cancel())
{
existing.cancel();
cancel();
throw new IllegalStateException();
if (LOG.isDebugEnabled())
LOG.debug("Cancelled timeout: {} for {}", result, request);
}
if (LOG.isDebugEnabled())
LOG.debug("Scheduled timeout task {} in {} ms for {}", task, timeout, request);
return true;
}

@Override
public void run()
public boolean schedule(HttpRequest request)
{
long timeoutInMs = request.timeoutIn(System.nanoTime(),MILLISECONDS);
if (LOG.isDebugEnabled())
LOG.debug("Executing timeout task {} for {}", task, request);
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}

public void cancel()
{
Scheduler.Task task = this.task.getAndSet(null);
if (task != null)
LOG.debug("Scheduled timeout {} ms for {}", timeoutInMs, request);
if (timeoutInMs>=0 && this.request.compareAndSet(null,request))
{
boolean cancelled = task.cancel();
if (LOG.isDebugEnabled())
LOG.debug("Cancelled (successfully: {}) timeout task {}", cancelled, task);
schedule(timeoutInMs, TimeUnit.MILLISECONDS);
return true;
}

return false;
}


}

0 comments on commit 5408218

Please sign in to comment.