Skip to content

Commit

Permalink
Issue #5105 - fix StatisticsHandler bug with async dispatched requests
Browse files Browse the repository at this point in the history
If the request is async dispatched, the check state.isSuspended() is not
correct to determine if the request was async or not. The check
state.isAsyncStarted() should be used instead.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Aug 19, 2020
1 parent a46ed5b commit 32358b1
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.AsyncEvent;
Expand All @@ -29,7 +28,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.AsyncContextEvent;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannelState;
Expand Down Expand Up @@ -59,6 +57,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful

private final LongAdder _asyncDispatches = new LongAdder();
private final LongAdder _expires = new LongAdder();
private final LongAdder _errors = new LongAdder();

private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
Expand All @@ -76,30 +75,30 @@ protected FutureCallback newShutdownCallback()
}
};

private final AtomicBoolean _wrapWarning = new AtomicBoolean();

private final AsyncListener _onCompletion = new AsyncListener()
{
@Override
public void onTimeout(AsyncEvent event) throws IOException
public void onStartAsync(AsyncEvent event) throws IOException
{
_expires.increment();
event.getAsyncContext().addListener(this);
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException
public void onTimeout(AsyncEvent event) throws IOException
{
event.getAsyncContext().addListener(this);
_expires.increment();
}

@Override
public void onError(AsyncEvent event) throws IOException
{
_errors.increment();
}

@Override
public void onComplete(AsyncEvent event) throws IOException
{
System.err.println("On Async Complete for " + event);
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();

Request request = state.getBaseRequest();
Expand Down Expand Up @@ -149,6 +148,10 @@ public void statsReset()
@Override
public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
Handler handler = getHandler();
if (handler == null || !isStarted() || isShutdown())
return;

_dispatchedStats.increment();

final long start;
Expand All @@ -168,51 +171,40 @@ public void handle(String path, Request baseRequest, HttpServletRequest request,

try
{
Handler handler = getHandler();
if (handler != null && !_shutdown.isShutdown() && isStarted())
handler.handle(path, baseRequest, request, response);
else
{
if (!baseRequest.isHandled())
baseRequest.setHandled(true);
else if (_wrapWarning.compareAndSet(false, true))
LOG.warn("Bad statistics configuration. Latencies will be incorrect in {}", this);
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
}
handler.handle(path, baseRequest, request, response);
}
finally
{
final long now = System.currentTimeMillis();
final long dispatched = now - start;

// TODO: make dispatchedStats optional metric for shutdown
_dispatchedStats.decrement();
_dispatchedTimeStats.record(dispatched);

if (state.isSuspended() || state.isAsyncStarted())
if (state.isInitial())
{
if (state.isInitial())
if (state.isAsyncStarted())
{
state.addListener(_onCompletion);
_asyncWaitStats.increment();
}
}
else if (state.isInitial())
{
long d = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);

// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
else
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
long d = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);

// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
}
}
}
// else onCompletion will handle it.
}
}

Expand Down Expand Up @@ -251,6 +243,8 @@ protected void updateResponse(Request request)
@Override
protected void doStart() throws Exception
{
if (getHandler() == null)
throw new IllegalStateException("StatisticsHandler has no Wrapped Handler");
_shutdown.cancel();
super.doStart();
statsReset();
Expand Down Expand Up @@ -467,6 +461,16 @@ public int getExpires()
return _expires.intValue();
}

/**
* @return the number of async errors that occurred.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("number of async errors that occurred")
public int getErrors()
{
return _errors.intValue();
}

/**
* @return the number of responses with a 1xx status returned by this context
* since {@link #statsReset()} was last called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,65 @@ public void onComplete(AsyncEvent event)
assertTrue(_statsHandler.getDispatchedTimeMax() + dispatchTime <= _statsHandler.getDispatchedTimeTotal());
}

@Test
public void asyncDispatchTest() throws Exception
{
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
final CyclicBarrier[] barrier = {new CyclicBarrier(2), new CyclicBarrier(2), new CyclicBarrier(2), new CyclicBarrier(2)};
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
{
request.setHandled(true);
try
{
if (asyncHolder.get() == null)
{
barrier[0].await();
barrier[1].await();
AsyncContext asyncContext = request.startAsync();
asyncHolder.set(asyncContext);
asyncContext.dispatch();
}
else
{
barrier[2].await();
barrier[3].await();
}
}
catch (Exception x)
{
throw new ServletException(x);
}
}
});
_server.start();

String request = "GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
_connector.executeRequest(request);

// Before we have started async we have one active request.
barrier[0].await();
assertEquals(1, _statistics.getConnections());
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
assertEquals(1, _statsHandler.getDispatched());
assertEquals(1, _statsHandler.getDispatchedActive());
barrier[1].await();

// After we are async the same request should still be active even though we have async dispatched.
barrier[2].await();
assertEquals(1, _statistics.getConnections());
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
assertEquals(2, _statsHandler.getDispatched());
assertEquals(1, _statsHandler.getDispatchedActive());
barrier[3].await();
}

@Test
public void testSuspendExpire() throws Exception
{
Expand Down

0 comments on commit 32358b1

Please sign in to comment.