diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 174643b7189b..93859d695d7c 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -315,13 +315,35 @@ protected void responseHeaders(HttpExchange exchange) * Method to be invoked when response content is available to be read. *
* This method takes care of ensuring the {@link Content.Source} passed to - * {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} calls the - * demand callback. + * {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} + * calls the demand callback. + * The call to the demand callback is serialized with other events. */ - protected void responseContentAvailable() + protected void responseContentAvailable(HttpExchange exchange) { if (LOG.isDebugEnabled()) - LOG.debug("Response content available on {}", this); + LOG.debug("Invoking responseContentAvailable on {}", this); + + invoker.run(() -> + { + if (LOG.isDebugEnabled()) + LOG.debug("Executing responseContentAvailable on {}", this); + + if (exchange.isResponseCompleteOrTerminated()) + return; + + responseContentAvailable(); + }); + } + + /** + * Method to be invoked when response content is available to be read. + *
+ * This method directly invokes the demand callback, assuming the caller
+ * is already serialized with other events.
+ */
+ protected void responseContentAvailable()
+ {
contentSource.onDataAvailable();
}
@@ -344,7 +366,7 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
if (!exchange.responseComplete(null))
return;
- invoker.run(() ->
+ Runnable successTask = () ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseSuccess on {}", this);
@@ -365,7 +387,12 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
// Mark atomically the response as terminated, with
// respect to concurrency between request and response.
terminateResponse(exchange);
- }, afterSuccessTask);
+ };
+
+ if (afterSuccessTask == null)
+ invoker.run(successTask);
+ else
+ invoker.run(successTask, afterSuccessTask);
}
/**
@@ -712,9 +739,9 @@ public void onDataAvailable()
{
if (LOG.isDebugEnabled())
LOG.debug("onDataAvailable on {}", this);
- // The demandCallback will call read() that will itself call
- // HttpReceiver.read(boolean) so it must be called by the invoker.
- invokeDemandCallback(true);
+ // The onDataAvailable() method is only ever called
+ // by the invoker so avoid using the invoker again.
+ invokeDemandCallback(false);
}
@Override
@@ -760,8 +787,8 @@ private void processDemand()
}
}
- // The processDemand method is only ever called by the
- // invoker so there is no need to use the latter here.
+ // The processDemand() method is only ever called
+ // by the invoker so avoid using the invoker again.
invokeDemandCallback(false);
}
@@ -769,20 +796,19 @@ private void invokeDemandCallback(boolean invoke)
{
Runnable demandCallback = demandCallbackRef.getAndSet(null);
if (LOG.isDebugEnabled())
- LOG.debug("Invoking demand callback on {}", this);
- if (demandCallback != null)
+ LOG.debug("Invoking demand callback {} on {}", demandCallback, this);
+ if (demandCallback == null)
+ return;
+ try
{
- try
- {
- if (invoke)
- invoker.run(demandCallback);
- else
- demandCallback.run();
- }
- catch (Throwable x)
- {
- fail(x);
- }
+ if (invoke)
+ invoker.run(demandCallback);
+ else
+ demandCallback.run();
+ }
+ catch (Throwable x)
+ {
+ fail(x);
}
}
diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java
index c292b23a79ab..de6ff4973a3d 100644
--- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java
+++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java
@@ -79,7 +79,9 @@ void receive()
}
else
{
- responseContentAvailable();
+ HttpExchange exchange = getHttpExchange();
+ if (exchange != null)
+ responseContentAvailable(exchange);
}
}
diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java
index b904a8347c16..8c810e215acd 100644
--- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java
+++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java
@@ -40,7 +40,9 @@ void receive()
}
else
{
- responseContentAvailable();
+ HttpExchange exchange = getHttpExchange();
+ if (exchange != null)
+ responseContentAvailable(exchange);
}
}
@@ -107,6 +109,9 @@ public void failAndClose(Throwable failure)
void content(Content.Chunk chunk)
{
+ HttpExchange exchange = getHttpExchange();
+ if (exchange == null)
+ return;
if (this.chunk != null)
throw new IllegalStateException();
// Retain the chunk because it is stored for later reads.
diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java
index 45f34c30674f..bddb5d7396f0 100644
--- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java
+++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java
@@ -50,8 +50,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class);
- private final Runnable onDataAvailableTask = new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, this::responseContentAvailable);
-
public HttpReceiverOverHTTP2(HttpChannel channel)
{
super(channel);
@@ -213,7 +211,7 @@ public Runnable onDataAvailable()
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return null;
- return onDataAvailableTask;
+ return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseContentAvailable(exchange));
}
@Override
diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java
index 86ae453cee64..0173430314a3 100644
--- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java
+++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -33,6 +34,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
+import org.eclipse.jetty.client.AsyncRequestContent;
+import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
@@ -793,6 +796,37 @@ public void onComplete(Result result)
assertThat(onContentSourceErrorRef.get(), is(nullValue()));
}
+ @Test
+ public void testRequestContentResponseContent() throws Exception
+ {
+ start(new Handler.Abstract()
+ {
+ @Override
+ public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
+ {
+ Content.copy(request, response, callback);
+ return true;
+ }
+ });
+
+ AsyncRequestContent content = new AsyncRequestContent();
+ var request = httpClient.newRequest("localhost", connector.getLocalPort())
+ .method(HttpMethod.POST)
+ .body(content);
+ CompletableFuture