Skip to content

Commit

Permalink
Fixes #12171 - QoSHandler does not resume on a virtual thread.
Browse files Browse the repository at this point in the history
Now QoSHandler attempts to resume using a virtual thread if the request was handled with a virtual thread and then suspended.

Removed warn() from VirtualThreads.isVirtualThread(), it was too verbose.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 19, 2024
1 parent 877aaa5 commit dc3828d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -35,6 +36,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.thread.Scheduler;
Expand Down Expand Up @@ -353,15 +355,31 @@ private boolean resumeSuspended()
if (LOG.isDebugEnabled())
LOG.debug("{} resuming {}", this, entry.request);
// Always dispatch to avoid StackOverflowError.
getServer().getThreadPool().execute(entry);
execute(entry, entry.useVirtualThreads);
return true;
}
}
return false;
}

private void execute(Runnable task, boolean useVirtualThreads)
{
ThreadPool executor = getServer().getThreadPool();
if (useVirtualThreads)
{
Executor virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(executor);
if (virtualExecutor != null)
{
virtualExecutor.execute(task);
return;
}
}
executor.execute(task);
}

private class Entry implements CyclicTimeouts.Expirable, Runnable
{
private final boolean useVirtualThreads;
private final Request request;
private final Response response;
private final Callback callback;
Expand All @@ -370,6 +388,7 @@ private class Entry implements CyclicTimeouts.Expirable, Runnable

private Entry(Request request, Response response, Callback callback, int priority)
{
this.useVirtualThreads = VirtualThreads.isVirtualThread();
this.request = request;
this.response = response;
this.callback = callback;
Expand Down Expand Up @@ -414,7 +433,7 @@ private void expire()
}

if (removed)
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
execute(() -> failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()), useVirtualThreads);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package org.eclipse.jetty.server.handler;

import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -27,10 +30,15 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -49,7 +57,8 @@ public class QoSHandlerTest

private void start(QoSHandler qosHandler) throws Exception
{
server = new Server();
if (server == null)
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
server.setHandler(qosHandler);
Expand Down Expand Up @@ -407,4 +416,69 @@ public boolean handle(Request request, Response response, Callback callback)
assertEquals(HttpStatus.OK_200, response.getStatus());
}

@Test
@DisabledForJreRange(max = JRE.JAVA_20)
public void testRequestInVirtualThreadIsResumedInVirtualThread() throws Exception
{
QoSHandler qosHandler = new QoSHandler();
qosHandler.setMaxRequestCount(1);
List<Callback> callbacks = new ArrayList<>();
qosHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.setStatus(VirtualThreads.isVirtualThread() ? HttpStatus.OK_200 : HttpStatus.NOT_ACCEPTABLE_406);
// Save the callback but do not succeed it yet.
callbacks.add(callback);
return true;
}
});
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("st");
serverThreads.setVirtualThreadsExecutor(VirtualThreads.getNamedVirtualThreadsExecutor("vst"));
server = new Server(serverThreads);
ServerConnector networkConnector = new ServerConnector(server, 1, 1);
server.addConnector(networkConnector);
start(qosHandler);

// Send the first request that will not be completed yet.
try (SocketChannel client1 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort())))
{
client1.write(StandardCharsets.UTF_8.encode("""
GET /first HTTP/1.1
Host: localhost
"""));
// Wait that the request arrives at the server.
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));

// Send the second request, it should be suspended by QoSHandler.
try (SocketChannel client2 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort())))
{
client2.write(StandardCharsets.UTF_8.encode("""
GET /second HTTP/1.1
Host: localhost
"""));
// Wait for the second request to be suspended.
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1L));

// Finish the first request, so that the second can be resumed.
callbacks.remove(0).succeeded();
client1.socket().setSoTimeout(5000);
HttpTester.Response response1 = HttpTester.parseResponse(client1);
assertEquals(HttpStatus.OK_200, response1.getStatus());

// Wait for the second request to arrive to the server.
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));

// Finish the second request.
callbacks.remove(0).succeeded();
client2.socket().setSoTimeout(5000);
HttpTester.Response response2 = HttpTester.parseResponse(client2);
assertEquals(HttpStatus.OK_200, response2.getStatus());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public static boolean isVirtualThread()
}
catch (Throwable x)
{
warn();
return false;
}
}
Expand Down

0 comments on commit dc3828d

Please sign in to comment.