Skip to content

Commit

Permalink
Issue jetty#12185 implementation/test of max suspended requests in Qo…
Browse files Browse the repository at this point in the history
…SHandler
  • Loading branch information
LarsKrogJensen committed Aug 22, 2024
1 parent 0644aaf commit 497da2d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
* If more requests are received, they are suspended (that is, not
* forwarded to the child {@code Handler}) and stored in a priority
* queue.
* Priorities are determined via {@link #getPriority(Request)},
* Maximum number of suspended request can be set {@link #setMaxSuspendedRequestCount(int)} to avoid
* out of memory error. When this limit is reached, the request will fail fast
* with status code {@code 503} (not available).</p>
* <p>Priorities are determined via {@link #getPriority(Request)},
* that should return values between {@code 0} (the lowest priority)
* and positive numbers, typically in the range {@code 0-10}.</p>
* <p>When a request that is being processed completes, the suspended
Expand Down Expand Up @@ -82,6 +85,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
private final Set<Integer> priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder());
private CyclicTimeouts<Entry> timeouts;
private int maxRequests;
private int maxSuspendedRequests = Integer.MAX_VALUE;
private Duration maxSuspend = Duration.ZERO;

public QoSHandler()
Expand Down Expand Up @@ -119,6 +123,29 @@ public void setMaxRequestCount(int maxRequests)
this.maxRequests = maxRequests;
}

/**
* @return the max number of suspended requests
*/
@ManagedAttribute(value = "The maximum number of suspended requests", readonly = true)
public int getMaxSuspendedRequestCount()
{
return maxSuspendedRequests;
}

/**
* <p>Sets the max number of suspended requests.</p>
* <p>Once the max suspended request limit is reached, the request is failed with a HTTP
* status of {@code 503 Service unavailable}.</p>
*
* @param maxSuspendedRequests the max number of suspended requests
*/
public void setMaxSuspendedRequestCount(int maxSuspendedRequests)
{
if (isStarted())
throw new IllegalStateException("Cannot change maxSuspendedRequests: " + this);
this.maxSuspendedRequests = maxSuspendedRequests;
}

/**
* Get the max duration of time a request may stay suspended.
* @return the max duration of time a request may stay suspended
Expand Down Expand Up @@ -194,6 +221,7 @@ private boolean process(Request request, Response response, Callback callback) t
LOG.debug("{} processing {}", this, request);

boolean expired = false;
boolean tooManyRequests = false;

// The read lock allows concurrency with resume(),
// which is the common case, but not with expire().
Expand All @@ -203,7 +231,14 @@ private boolean process(Request request, Response response, Callback callback) t
int permits = state.decrementAndGet();
if (permits < 0)
{
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
if (Math.abs(permits) > getMaxSuspendedRequestCount())
{
// Reached the limit of number of suspended requests,
// complete request with 503, service unavailable
state.incrementAndGet();
tooManyRequests = true;
}
else if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
{
// Cover this race condition:
// T1 in this method may find no permits, so it will suspend the request.
Expand All @@ -228,11 +263,13 @@ private boolean process(Request request, Response response, Callback callback) t
lock.readLock().unlock();
}

if (!expired)
return handleWithPermit(request, response, callback);
if (expired || tooManyRequests)
{
notAvailable(response, callback);
return true;
}

notAvailable(response, callback);
return true;
return handleWithPermit(request, response, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -407,4 +408,57 @@ public boolean handle(Request request, Response response, Callback callback)
assertEquals(HttpStatus.OK_200, response.getStatus());
}

@Test
public void testMaxSuspendedRequests() throws Exception
{
int delay = 100;
QoSHandler qosHandler = new QoSHandler();
qosHandler.setMaxRequestCount(2);
qosHandler.setMaxSuspendedRequestCount(2);
qosHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
try
{
Thread.sleep(delay);
callback.succeeded();
}
catch (Throwable x) {
callback.failed(x);
}
return true;
}
});
start(qosHandler);

int parallelism = 8;
Vector<Integer> statusCodes = new Vector<>(); // due to synchronized List
IntStream.range(0, parallelism).parallel().forEach(i ->
{
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET /%d HTTP/1.1
Host: localhost
""".formatted(i))) {
String text = endPoint.getResponse(false, parallelism * delay * 5, TimeUnit.MILLISECONDS);
HttpTester.Response response = HttpTester.parseResponse(text);
statusCodes.add(response.getStatus());
}
catch (Exception x)
{
fail(x);
}
});

await().atMost(5, TimeUnit.SECONDS).until(statusCodes::size, is(8));
// expectation is that
// 2 requests will be handled straight away
// 2 will be suspended and eventually handled
// 4 will hit the max suspended request limit
assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.OK_200).count());
assertEquals(4, statusCodes.stream().filter(sc -> sc == HttpStatus.SERVICE_UNAVAILABLE_503).count());
}

}

0 comments on commit 497da2d

Please sign in to comment.