From 57089ec471779142918961369492c1ca0837d95c Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 21 Sep 2023 18:53:38 +0200 Subject: [PATCH 1/4] Server side trailers #7647 --- .../webserver/http2/Http2ServerResponse.java | 110 ++++++++--- .../webserver/tests/http2/HeadersTest.java | 127 ++++++++++-- .../helidon/webserver/tests/HeadersTest.java | 183 ++++++++++++++++++ .../webserver/http/ServerResponse.java | 8 + .../webserver/http/ServerResponseBase.java | 18 ++ .../webserver/http1/Http1ServerResponse.java | 60 ++++-- 6 files changed, 449 insertions(+), 57 deletions(-) create mode 100644 webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java index 2f2b248a502..185b463c762 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java @@ -47,13 +47,16 @@ class Http2ServerResponse extends ServerResponseBase { private final Http2StreamWriter writer; private final int streamId; private final ServerResponseHeaders headers; + private final ServerResponseHeaders trailers; private final FlowControl.Outbound flowControl; + private final Http2ServerRequest request; private boolean isSent; private boolean streamingEntity; private long bytesWritten; private BlockingOutputStream outputStream; private UnaryOperator outputStreamFilter; + private String streamResult = null; Http2ServerResponse(ConnectionContext ctx, Http2ServerRequest request, @@ -62,10 +65,12 @@ class Http2ServerResponse extends ServerResponseBase { FlowControl.Outbound flowControl) { super(ctx, request); this.ctx = ctx; + this.request = request; this.writer = writer; this.streamId = streamId; this.flowControl = flowControl; this.headers = ServerResponseHeaders.create(); + this.trailers = ServerResponseHeaders.create(); } @Override @@ -111,11 +116,14 @@ public void send(byte[] entityBytes) { "Status must be configured on response, " + "do not set HTTP/2 pseudo headers")); - Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(bytes.length, - Http2FrameTypes.DATA, - DataFlags.create(Http2Flag.END_OF_STREAM), - streamId), - BufferData.create(bytes)); + boolean sendTrailers = request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER); + + Http2FrameData frameData = + new Http2FrameData(Http2FrameHeader.create(bytes.length, + Http2FrameTypes.DATA, + DataFlags.create(sendTrailers ? 0 : Http2Flag.END_OF_STREAM), + streamId), + BufferData.create(bytes)); http2Headers.validateResponse(); bytesWritten = writer.writeHeaders(http2Headers, @@ -123,6 +131,15 @@ public void send(byte[] entityBytes) { Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), frameData, flowControl); + if (sendTrailers) { + Http2Headers http2trailers = Http2Headers.create(trailers); + int written = writer.writeHeaders(http2trailers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), + flowControl); + bytesWritten += written; + } + afterSend(); } @@ -141,7 +158,11 @@ public OutputStream outputStream() { } streamingEntity = true; - outputStream = new BlockingOutputStream(headers, writer, streamId, flowControl, status(), () -> { + if (request.headers().contains(HeaderValues.TE_TRAILERS)) { + headers.add(STREAM_TRAILERS); + } + + outputStream = new BlockingOutputStream(request, this, () -> { this.isSent = true; afterSend(); }); @@ -161,10 +182,19 @@ public long bytesWritten() { public ServerResponseHeaders headers() { return headers; } + @Override + public ServerResponseHeaders trailers() { + if (request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER)) { + return trailers; + } + throw new IllegalStateException( + "Trailers are supported only when request came with 'TE: trailers' header or " + + "response headers have trailer names definition 'Trailer: '"); + } @Override public void streamResult(String result) { - // TODO use this when closing the stream + this.streamResult = result; } @Override @@ -210,30 +240,32 @@ public void streamFilter(UnaryOperator filterFunction) { private static class BlockingOutputStream extends OutputStream { + private final Http2ServerRequest request; private final ServerResponseHeaders headers; + private final ServerResponseHeaders trailers; private final Http2StreamWriter writer; private final int streamId; private final FlowControl.Outbound flowControl; private final Status status; private final Runnable responseCloseRunnable; + private final Http2ServerResponse response; private BufferData firstBuffer; private boolean closed; private boolean firstByte = true; private long bytesWritten; - private BlockingOutputStream(ServerResponseHeaders headers, - Http2StreamWriter writer, - int streamId, - FlowControl.Outbound flowControl, - Status status, + private BlockingOutputStream(Http2ServerRequest request, + Http2ServerResponse response, Runnable responseCloseRunnable) { - - this.headers = headers; - this.writer = writer; - this.streamId = streamId; - this.flowControl = flowControl; - this.status = status; + this.request = request; + this.response = response; + this.headers = response.headers; + this.trailers = response.trailers; + this.writer = response.writer; + this.streamId = response.streamId; + this.flowControl = response.flowControl; + this.status = response.status(); this.responseCloseRunnable = responseCloseRunnable; } @@ -269,8 +301,12 @@ void commit() { return; } this.closed = true; + boolean sendTrailers = + request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER); if (firstByte) { - sendFirstChunkOnly(); + sendFirstChunkOnly(sendTrailers); + } else if (sendTrailers) { + sendTrailers(); } else { sendEndOfStream(); } @@ -301,7 +337,7 @@ private void write(BufferData buffer) throws IOException { } } - private void sendFirstChunkOnly() { + private void sendFirstChunkOnly(boolean sendTrailers) { int contentLength; if (firstBuffer == null) { headers.set(HeaderValues.CONTENT_LENGTH_ZERO); @@ -323,16 +359,21 @@ private void sendFirstChunkOnly() { if (contentLength == 0) { int written = writer.writeHeaders(http2Headers, streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS - | Http2Flag.END_OF_STREAM), + Http2Flag.HeaderFlags.create( + sendTrailers + ? Http2Flag.END_OF_HEADERS + : Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), flowControl); bytesWritten += written; } else { - Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(contentLength, - Http2FrameTypes.DATA, - DataFlags.create(Http2Flag.END_OF_STREAM), - streamId), - firstBuffer); + Http2FrameData frameData = + new Http2FrameData(Http2FrameHeader.create(contentLength, + Http2FrameTypes.DATA, + DataFlags.create(sendTrailers + ? 0 + : Http2Flag.END_OF_STREAM), + streamId), + firstBuffer); int written = writer.writeHeaders(http2Headers, streamId, Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), @@ -379,5 +420,20 @@ private void sendEndOfStream() { bytesWritten += Http2FrameHeader.LENGTH; writer.writeData(frameData, flowControl); } + + private void sendTrailers(){ + if (response.streamResult != null) { + trailers.set(STREAM_RESULT_NAME, response.streamResult); + } + trailers.set(STREAM_STATUS_NAME, status.code()); + + Http2Headers http2Headers = Http2Headers.create(trailers); + int written = writer.writeHeaders(http2Headers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS + | Http2Flag.END_OF_STREAM), + flowControl); + bytesWritten += written; + } } } diff --git a/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java b/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java index ddf036726d4..5389188b60a 100644 --- a/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java +++ b/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java @@ -17,6 +17,7 @@ package io.helidon.webserver.tests.http2; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -27,6 +28,13 @@ import java.util.Set; import java.util.stream.Collectors; +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; +import io.helidon.http.HeaderValues; +import io.helidon.http.Status; +import io.helidon.webclient.api.ClientResponseTyped; +import io.helidon.webclient.http2.Http2Client; +import io.helidon.webclient.http2.Http2ClientProtocolConfig; import io.helidon.webserver.WebServer; import io.helidon.webserver.WebServerConfig; import io.helidon.webserver.http.HttpRouting; @@ -44,6 +52,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static io.helidon.common.testing.http.junit5.HttpHeaderMatcher.hasHeader; import static io.helidon.http.Method.GET; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -53,6 +62,8 @@ public class HeadersTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DATA = "Helidon!!!".repeat(10); + private static final Header TEST_TRAILER_HEADER = HeaderValues.create("test-trailer", "trailer-value"); + private final Http2Client client; @SetUpServer static void setUpServer(WebServerConfig.Builder serverBuilder) { @@ -76,25 +87,67 @@ static void setUpServer(WebServerConfig.Builder serverBuilder) { @SetUpRoute static void router(HttpRouting.Builder router) { + router.error(IllegalStateException.class, (req, res, t) -> res.status(500).send(t.getMessage())); router.route(Http2Route.route(GET, "/ping", (req, res) -> res.send("pong"))); router.route(Http2Route.route(GET, "/cont-out", - (req, res) -> { - for (int i = 0; i < 500; i++) { - res.header("test-header-" + i, DATA + i); - } - res.send(); - } + (req, res) -> { + for (int i = 0; i < 500; i++) { + res.header("test-header-" + i, DATA + i); + } + res.send(); + } )); router.route(Http2Route.route(GET, "/cont-in", - (req, res) -> { - String joinedHeaders = req.headers() - .stream() - .filter(h -> h.name().startsWith("test-header-")) - .map(h -> h.name() + "=" + h.get()) - .collect(Collectors.joining("\n")); - res.send(joinedHeaders); - } + (req, res) -> { + String joinedHeaders = req.headers() + .stream() + .filter(h -> h.name().startsWith("test-header-")) + .map(h -> h.name() + "=" + h.get()) + .collect(Collectors.joining("\n")); + res.send(joinedHeaders); + } )); + router.route(Http2Route.route(GET, "/trailers-stream", + (req, res) -> { + res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); + try (var os = res.outputStream()) { + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + res.trailers().add(TEST_TRAILER_HEADER); + } + } + )); + router.route(Http2Route.route(GET, "/trailers-stream-result", + (req, res) -> { + try (var os = res.outputStream()) { + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + res.streamResult("Kaboom!"); + } + } + )); + router.route(Http2Route.route(GET, "/trailers", + (req, res) -> { + res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); + res.trailers().add(TEST_TRAILER_HEADER); + res.send(DATA.repeat(3)); + } + )); + router.route(Http2Route.route(GET, "/trailers-no-trailers", + (req, res) -> { + res.trailers().add(TEST_TRAILER_HEADER); + res.send(DATA); + } + )); + } + + HeadersTest(WebServer server) { + client = Http2Client.builder() + .baseUri("http://localhost:" + server.port()) + .protocolConfig(Http2ClientProtocolConfig.builder().priorKnowledge(true).build()) + .build(); } @Test @@ -166,6 +219,52 @@ void serverInboundTooLarge(WebServer server) throws IOException, InterruptedExce HttpResponse.BodyHandlers.ofString())); } + @Test + void trailersEntity() throws IOException { + ClientResponseTyped res = client + .get("/trailers") + .request(InputStream.class); + try (var is = res.entity()) { + is.readAllBytes(); + } + assertThat(res.trailers(), hasHeader(TEST_TRAILER_HEADER)); + } + + @Test + void trailersStream() throws IOException { + ClientResponseTyped res = client + .get("/trailers-stream") + .request(InputStream.class); + try (var is = res.entity()) { + is.readAllBytes(); + } + assertThat(res.trailers(), hasHeader(TEST_TRAILER_HEADER)); + } + + @Test + void trailersStreamResult() throws IOException { + ClientResponseTyped res = client + .get("/trailers-stream-result") + .header(HeaderValues.TE_TRAILERS) + .request(InputStream.class); + try (var is = res.entity()) { + is.readAllBytes(); + } + assertThat(res.trailers(), hasHeader(HeaderValues.create("stream-result", "Kaboom!"))); + } + + @Test + void trailersNoTrailers() { + ClientResponseTyped res = client + .get("/trailers-no-trailers") + .request(String.class); + + assertThat(res.status(), is(Status.INTERNAL_SERVER_ERROR_500)); + assertThat(res.entity(), is( + "Trailers are supported only when request came with 'TE: trailers' header or " + + "response headers have trailer names definition 'Trailer: '")); + } + private HttpClient http2Client(URI base) throws IOException, InterruptedException { HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) diff --git a/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java b/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java new file mode 100644 index 00000000000..320a23653fd --- /dev/null +++ b/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.tests; + +import java.io.IOException; +import java.io.InputStream; + +import io.helidon.http.ClientResponseHeaders; +import io.helidon.http.Header; +import io.helidon.http.HeaderName; +import io.helidon.http.HeaderNames; +import io.helidon.http.HeaderValues; +import io.helidon.http.Status; +import io.helidon.webclient.api.ClientResponseTyped; +import io.helidon.webclient.api.WebClient; +import io.helidon.webserver.http.HttpRouting; +import io.helidon.webserver.testing.junit5.ServerTest; +import io.helidon.webserver.testing.junit5.SetUpRoute; + +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.Test; + +import static io.helidon.common.testing.http.junit5.HttpHeaderMatcher.hasHeader; +import static io.helidon.http.Method.GET; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@ServerTest +class HeadersTest { + + private static final String DATA = "Helidon!!!".repeat(10); + private static final Header TEST_TRAILER_HEADER = HeaderValues.create("test-trailer", "trailer-value"); + private static final HeaderName CLIENT_PORT_HEADER_NAME = HeaderNames.create("client-port"); + + private int clientPort = -1; + + @SetUpRoute + static void router(HttpRouting.Builder router) { + router.error(IllegalStateException.class, (req, res, t) -> res.status(500).send(t.getMessage())); + router.any((req, res) -> res + .header(CLIENT_PORT_HEADER_NAME, String.valueOf(req.remotePeer().port())) + .next()); + router.route(GET, "/trailers", + (req, res) -> { + res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); + try (var os = res.outputStream()) { + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + res.trailers().add(TEST_TRAILER_HEADER); + } + } + ); + router.route(GET, "/stream-result", + (req, res) -> { + res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); + try (var os = res.outputStream()) { + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + res.streamResult("Kaboom!"); + } + } + ); + router.route(GET, "/stream-status", + (req, res) -> { + res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); + try (var os = res.outputStream()) { + os.write(DATA.getBytes()); + os.write(DATA.getBytes()); + os.flush(); + res.status(Status.I_AM_A_TEAPOT_418); + os.write(DATA.getBytes()); + } + } + ); + router.route(GET, "/trailers-forced", + (req, res) -> { + res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); + res.trailers().add(TEST_TRAILER_HEADER); + res.send(DATA.repeat(3)); + } + ); + router.route(GET, "/trailers-no-trailers", + (req, res) -> { + res.trailers().add(TEST_TRAILER_HEADER); + res.send(DATA); + } + ); + } + + @Test + void trailersTE(WebClient client) throws IOException { + ClientResponseTyped res = client + .get("/trailers") + .header(HeaderValues.create("TE", "trailers")) + .request(InputStream.class); + try (var ins = res.entity()) { + assertThat(ins.readAllBytes(), is(DATA.repeat(3).getBytes())); + } + assertThat(res.trailers(), hasHeader(TEST_TRAILER_HEADER)); + checkCachedConnection(res.headers()); + } + + @Test + void trailers(WebClient client) { + ClientResponseTyped res = client + .get("/trailers") + .request(String.class); + assertThat(res.entity(), is(DATA.repeat(3))); + assertThat(res.trailers(), hasHeader(TEST_TRAILER_HEADER)); + checkCachedConnection(res.headers()); + } + + @Test + void trailersForced(WebClient client) { + ClientResponseTyped res = client + .get("/trailers-forced") + .request(String.class); + assertThat(res.entity(), is(DATA.repeat(3))); + assertThat(res.trailers(), hasHeader(TEST_TRAILER_HEADER)); + checkCachedConnection(res.headers()); + } + + @Test + void streamResult(WebClient client) throws IOException { + ClientResponseTyped res = client + .get("/stream-result") + .header(HeaderValues.TE_TRAILERS) + .request(InputStream.class); + try (var ins = res.entity()) { + assertThat(ins.readAllBytes(), is(DATA.repeat(3).getBytes())); + } + assertThat(res.trailers(), hasHeader(HeaderValues.create("Stream-Status", 200))); + assertThat(res.trailers(), hasHeader(HeaderValues.create("Stream-Result", "Kaboom!"))); + checkCachedConnection(res.headers()); + } + + @Test + void streamStatus(WebClient client) throws IOException { + ClientResponseTyped res = client + .get("/stream-status") + .header(HeaderValues.create("TE", "trailers")) + .request(InputStream.class); + try (var ins = res.entity()) { + ins.readAllBytes(); + } + assertThat(res.trailers(), hasHeader(HeaderValues.create("Stream-Status", Status.I_AM_A_TEAPOT_418.code()))); + checkCachedConnection(res.headers()); + } + + @Test + void trailersNoTrailers(WebClient client) { + ClientResponseTyped res = client + .get("/trailers-no-trailers") + .request(String.class); + + assertThat(res.status(), CoreMatchers.is(Status.INTERNAL_SERVER_ERROR_500)); + assertThat(res.entity(), CoreMatchers.is( + "Trailers are supported only when request came with 'TE: trailers' header or " + + "response headers have trailer names definition 'Trailer: '")); + } + + private void checkCachedConnection(ClientResponseHeaders h) { + if (clientPort == -1) { + clientPort = h.get(CLIENT_PORT_HEADER_NAME).asInt().get(); + } + } +} diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java index a2974057982..5c294227b03 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java @@ -190,6 +190,14 @@ default void send(Optional entity) { */ ServerResponseHeaders headers(); + /** + * Response trailers (mutable). + * @return trailers + * @throws java.lang.IllegalStateException if client didn't ask for trailers with {@code TE: trailers} header in request + * or response doesn't contain trailer declaration headers {@code Trailer: } + */ + ServerResponseHeaders trailers(); + /** * Description of the result of output stream processing. * In case an output stream was used, calling this method will immediately close the stream and return this diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java index 65e46e209c1..1465cdc2cc9 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java @@ -27,6 +27,10 @@ import io.helidon.common.buffers.BufferData; import io.helidon.common.uri.UriPath; import io.helidon.common.uri.UriQuery; +import io.helidon.http.Header; +import io.helidon.http.HeaderName; +import io.helidon.http.HeaderNames; +import io.helidon.http.HeaderValues; import io.helidon.http.HttpException; import io.helidon.http.HttpPrologue; import io.helidon.http.ServerRequestHeaders; @@ -47,6 +51,20 @@ @SuppressWarnings("unchecked") public abstract class ServerResponseBase> implements RoutingResponse { + /** + * Stream status trailer name. + */ + protected static final HeaderName STREAM_STATUS_NAME = HeaderNames.create("stream-status"); + /** + * Stream result trailer name. + */ + protected static final HeaderName STREAM_RESULT_NAME = HeaderNames.create("stream-result"); + /** + * Stream status trailers. + */ + protected static final Header STREAM_TRAILERS = + HeaderValues.create(HeaderNames.TRAILER, STREAM_STATUS_NAME.defaultCase() + + "," + STREAM_RESULT_NAME.defaultCase()); private final ContentEncodingContext contentEncodingContext; private final MediaContext mediaContext; private final ServerRequestHeaders requestHeaders; diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java index fa01f439c0b..b3e0c591451 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java @@ -35,7 +35,6 @@ import io.helidon.common.media.type.MediaTypes; import io.helidon.http.DateTime; import io.helidon.http.Header; -import io.helidon.http.HeaderName; import io.helidon.http.HeaderNames; import io.helidon.http.HeaderValues; import io.helidon.http.HttpException; @@ -57,11 +56,7 @@ class Http1ServerResponse extends ServerResponseBase { private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8); private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8); private static final byte[] TERMINATING_CHUNK = "0\r\n\r\n".getBytes(StandardCharsets.UTF_8); - private static final HeaderName STREAM_STATUS_NAME = HeaderNames.create("stream-status"); - private static final HeaderName STREAM_RESULT_NAME = HeaderNames.create("stream-result"); - private static final Header STREAM_TRAILERS = - HeaderValues.create(HeaderNames.TRAILER, STREAM_STATUS_NAME.defaultCase() - + "," + STREAM_RESULT_NAME.defaultCase()); + private static final byte[] TERMINATING_CHUNK_TRAILERS = "0\r\n".getBytes(StandardCharsets.UTF_8); @SuppressWarnings("rawtypes") private static final List SINK_PROVIDERS @@ -73,7 +68,7 @@ class Http1ServerResponse extends ServerResponseBase { private final DataWriter dataWriter; private final Http1ServerRequest request; private final ServerResponseHeaders headers; - private final WritableHeaders trailers = WritableHeaders.create(); + private final ServerResponseHeaders trailers; private final boolean keepAlive; private boolean streamingEntity; @@ -98,6 +93,7 @@ class Http1ServerResponse extends ServerResponseBase { this.dataWriter = dataWriter; this.request = request; this.headers = ServerResponseHeaders.create(); + this.trailers = ServerResponseHeaders.create(); this.keepAlive = keepAlive; this.validateHeaders = validateHeaders; } @@ -150,7 +146,7 @@ public Http1ServerResponse header(Header header) { // actually send the response over the wire @Override public void send(byte[] bytes) { - if (outputStreamFilter == null) { + if (outputStreamFilter == null && !headers.contains(HeaderNames.TRAILER)) { byte[] entity = entityBytes(bytes); BufferData bufferData = responseBuffer(entity); bytesWritten = bufferData.available(); @@ -221,6 +217,16 @@ public ServerResponseHeaders headers() { return headers; } + @Override + public ServerResponseHeaders trailers() { + if (request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER)) { + return trailers; + } + throw new IllegalStateException( + "Trailers are supported only when request came with 'TE: trailers' header or " + + "response headers have trailer names definition 'Trailer: '"); + } + @Override public void streamResult(String result) { this.streamResult = result; @@ -411,7 +417,8 @@ private BlockingOutputStream(ServerResponseHeaders headers, this.contentLength = headers.contentLength().orElse(-1); this.request = request; this.keepAlive = keepAlive; - this.forcedChunked = headers.contains(HeaderValues.TRANSFER_ENCODING_CHUNKED); + this.forcedChunked = headers.contains(HeaderValues.TRANSFER_ENCODING_CHUNKED) + || headers.contains(HeaderNames.TRAILER); this.validateHeaders = validateHeaders; } @@ -468,21 +475,25 @@ void commit() { return; } this.closed = true; + boolean sendTrailers = + (isChunked || forcedChunked) + && (request.headers().contains(HeaderValues.TE_TRAILERS) + || headers.contains(HeaderNames.TRAILER)); + if (firstByte) { if (forcedChunked && firstBuffer != null) { // no sense in sending no data, only do this if chunked requested through a header sendHeadersAndPrepare(); writeChunked(firstBuffer); - terminatingChunk(); + terminatingChunk(sendTrailers); } else { sendFirstChunkOnly(); } } else if (isChunked) { - terminatingChunk(); + terminatingChunk(sendTrailers); } - if (isChunked || forcedChunked) { - if (request.headers().contains(HeaderValues.TE_TRAILERS)) { + if (sendTrailers) { // not optimized, trailers enabled: we need to write trailers trailers.set(STREAM_STATUS_NAME, String.valueOf(status.get().code())); trailers.set(STREAM_RESULT_NAME, streamResult.get()); @@ -491,7 +502,6 @@ void commit() { buffer.write('\r'); // "\r\n" - empty line after headers buffer.write('\n'); dataWriter.write(buffer); - } } responseCloseRunnable.run(); @@ -506,8 +516,26 @@ long totalBytesWritten() { return responseBytesTotal; } - private void terminatingChunk() { - BufferData terminatingChunk = BufferData.create(TERMINATING_CHUNK); + /** + * Send terminating chunk without trailers {@code "0\r\n\r\n"} or when trailers are expected {@code "0\r\n"}. + * + *
{@code
+         *   chunked-body    = *chunk
+         *                     last-chunk
+         *                     trailer-section
+         *                     CRLF
+         *
+         *   chunk           = chunk-size [ chunk-ext ] CRLF
+         *                     chunk-data CRLF
+         *   last-chunk      = 1*("0") [ chunk-ext ] CRLF
+         *   trailer-section = *( field-line CRLF )
+         *   }
+ * + * @param trailers whether trailers are expected or not + * @see rfc9112 ยง7.1 + */ + private void terminatingChunk(boolean trailers) { + BufferData terminatingChunk = BufferData.create(trailers ? TERMINATING_CHUNK_TRAILERS : TERMINATING_CHUNK); sendListener.data(ctx, terminatingChunk); dataWriter.write(terminatingChunk); } From 875ae82a70f0228824640097b8048fc3872d5f8c Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Fri, 22 Sep 2023 16:28:51 +0200 Subject: [PATCH 2/4] Review issues 1 --- .../helidon/http/ServerResponseTrailers.java | 42 +++++++ .../http/ServerResponseTrailersImpl.java | 107 ++++++++++++++++++ ...eadersTest.java => HeadersClientTest.java} | 4 +- ...eadersTest.java => HeadersClientTest.java} | 4 +- .../webserver/http2/Http2ServerResponse.java | 11 +- .../webserver/http2/Http2ServerStream.java | 1 - .../tests/http2/Continue100Test.java | 14 +-- ...eadersTest.java => HeadersServerTest.java} | 4 +- ...eadersTest.java => HeadersServerTest.java} | 2 +- .../webserver/http/ServerResponse.java | 3 +- .../webserver/http/ServerResponseBase.java | 3 + .../webserver/http1/Http1ServerResponse.java | 7 +- 12 files changed, 178 insertions(+), 24 deletions(-) create mode 100644 http/http/src/main/java/io/helidon/http/ServerResponseTrailers.java create mode 100644 http/http/src/main/java/io/helidon/http/ServerResponseTrailersImpl.java rename webclient/tests/http1/src/test/java/io/helidon/webclient/tests/{HeadersTest.java => HeadersClientTest.java} (99%) rename webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/{HeadersTest.java => HeadersClientTest.java} (99%) rename webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/{HeadersTest.java => HeadersServerTest.java} (99%) rename webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/{HeadersTest.java => HeadersServerTest.java} (99%) diff --git a/http/http/src/main/java/io/helidon/http/ServerResponseTrailers.java b/http/http/src/main/java/io/helidon/http/ServerResponseTrailers.java new file mode 100644 index 00000000000..ec2005dfb4e --- /dev/null +++ b/http/http/src/main/java/io/helidon/http/ServerResponseTrailers.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.http; + +/** + * Mutable trailers of a server response. + */ +public interface ServerResponseTrailers extends WritableHeaders { + + /** + * Create a new instance of mutable server response trailers. + * + * @return new server response trailers + */ + static ServerResponseTrailers create() { + return new ServerResponseTrailersImpl(WritableHeaders.create()); + } + + /** + * Create a new instance of mutable server response trailers. + * + * @param existing trailers to add to these response trailers + * @return new server response trailers + */ + static ServerResponseTrailers create(Headers existing) { + return new ServerResponseTrailersImpl(WritableHeaders.create(existing)); + } +} diff --git a/http/http/src/main/java/io/helidon/http/ServerResponseTrailersImpl.java b/http/http/src/main/java/io/helidon/http/ServerResponseTrailersImpl.java new file mode 100644 index 00000000000..0f60d651ebf --- /dev/null +++ b/http/http/src/main/java/io/helidon/http/ServerResponseTrailersImpl.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.http; + +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +class ServerResponseTrailersImpl implements ServerResponseTrailers { + private final WritableHeaders delegate; + + ServerResponseTrailersImpl(WritableHeaders delegate) { + this.delegate = delegate; + } + + @Override + public List all(HeaderName name, Supplier> defaultSupplier) { + return delegate.all(name, defaultSupplier); + } + + @Override + public boolean contains(HeaderName name) { + return delegate.contains(name); + } + + @Override + public boolean contains(Header value) { + return delegate.contains(value); + } + + @Override + public Header get(HeaderName name) { + return delegate.get(name); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public List acceptedTypes() { + return delegate.acceptedTypes(); + } + + @Override + public ServerResponseTrailers setIfAbsent(Header header) { + delegate.setIfAbsent(header); + return this; + } + + @Override + public ServerResponseTrailers add(Header header) { + delegate.add(header); + return this; + } + + @Override + public ServerResponseTrailers remove(HeaderName name) { + delegate.remove(name); + return this; + } + + @Override + public ServerResponseTrailers remove(HeaderName name, Consumer
removedConsumer) { + delegate.remove(name, removedConsumer); + return this; + } + + @Override + public ServerResponseTrailers set(Header header) { + delegate.set(header); + return this; + } + + @Override + public ServerResponseTrailers clear() { + delegate.clear(); + return this; + } + + @Override + public ServerResponseTrailers from(Headers headers) { + delegate.from(headers); + return this; + } + + @Override + public Iterator
iterator() { + return delegate.iterator(); + } +} diff --git a/webclient/tests/http1/src/test/java/io/helidon/webclient/tests/HeadersTest.java b/webclient/tests/http1/src/test/java/io/helidon/webclient/tests/HeadersClientTest.java similarity index 99% rename from webclient/tests/http1/src/test/java/io/helidon/webclient/tests/HeadersTest.java rename to webclient/tests/http1/src/test/java/io/helidon/webclient/tests/HeadersClientTest.java index 6f9e8d2ba96..c9a9a6be8dc 100644 --- a/webclient/tests/http1/src/test/java/io/helidon/webclient/tests/HeadersTest.java +++ b/webclient/tests/http1/src/test/java/io/helidon/webclient/tests/HeadersClientTest.java @@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; -class HeadersTest { +class HeadersClientTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final Header INVALID_CONTENT_TYPE_VALUE = @@ -168,7 +168,7 @@ public void testInvalidTextContentTypeStrict() { @Test public void testInvalidTextContentTypeRelaxed() { WebClient client = WebClient.builder() - .from(HeadersTest.CLIENT.prototype()) + .from(HeadersClientTest.CLIENT.prototype()) .mediaTypeParserMode(ParserMode.RELAXED) .build(); try (HttpClientResponse res = client.method(Method.GET) diff --git a/webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/HeadersTest.java b/webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/HeadersClientTest.java similarity index 99% rename from webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/HeadersTest.java rename to webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/HeadersClientTest.java index 163d4a406d4..f828b437ab8 100644 --- a/webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/HeadersTest.java +++ b/webclient/tests/http2/src/test/java/io/helidon/webclient/tests/http2/HeadersClientTest.java @@ -61,9 +61,9 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; -class HeadersTest { +class HeadersClientTest { - private static final System.Logger LOGGER = System.getLogger(HeadersTest.class.getName()); + private static final System.Logger LOGGER = System.getLogger(HeadersClientTest.class.getName()); private static final Header BEFORE_HEADER = HeaderValues.create("test", "before"); private static final Header TRAILER_HEADER = HeaderValues.create("Trailer-header", "trailer-test"); private static final Duration TIMEOUT = Duration.ofSeconds(10); diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java index 185b463c762..aaa708e927e 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java @@ -28,6 +28,7 @@ import io.helidon.http.HeaderNames; import io.helidon.http.HeaderValues; import io.helidon.http.ServerResponseHeaders; +import io.helidon.http.ServerResponseTrailers; import io.helidon.http.Status; import io.helidon.http.http2.FlowControl; import io.helidon.http.http2.Http2Flag; @@ -47,7 +48,7 @@ class Http2ServerResponse extends ServerResponseBase { private final Http2StreamWriter writer; private final int streamId; private final ServerResponseHeaders headers; - private final ServerResponseHeaders trailers; + private final ServerResponseTrailers trailers; private final FlowControl.Outbound flowControl; private final Http2ServerRequest request; @@ -70,7 +71,7 @@ class Http2ServerResponse extends ServerResponseBase { this.streamId = streamId; this.flowControl = flowControl; this.headers = ServerResponseHeaders.create(); - this.trailers = ServerResponseHeaders.create(); + this.trailers = ServerResponseTrailers.create(); } @Override @@ -183,7 +184,7 @@ public ServerResponseHeaders headers() { return headers; } @Override - public ServerResponseHeaders trailers() { + public ServerResponseTrailers trailers() { if (request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER)) { return trailers; } @@ -242,7 +243,7 @@ private static class BlockingOutputStream extends OutputStream { private final Http2ServerRequest request; private final ServerResponseHeaders headers; - private final ServerResponseHeaders trailers; + private final ServerResponseTrailers trailers; private final Http2StreamWriter writer; private final int streamId; private final FlowControl.Outbound flowControl; @@ -425,7 +426,7 @@ private void sendTrailers(){ if (response.streamResult != null) { trailers.set(STREAM_RESULT_NAME, response.streamResult); } - trailers.set(STREAM_STATUS_NAME, status.code()); + trailers.set(STREAM_STATUS_NAME, response.status().code()); Http2Headers http2Headers = Http2Headers.create(trailers); int written = writer.writeHeaders(http2Headers, diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java index 356419d9565..739af65a914 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java @@ -438,7 +438,6 @@ private void handle() { response.commit(); } } finally { - request.content().consume(); this.state = Http2StreamState.CLOSED; } } else { diff --git a/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/Continue100Test.java b/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/Continue100Test.java index 9ae129cb8bf..8ade233e7e4 100644 --- a/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/Continue100Test.java +++ b/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/Continue100Test.java @@ -62,7 +62,7 @@ class Continue100Test { .setHttp2ClearTextUpgrade(false) .setSsl(false)); - private static final AtomicBoolean CLIEN_SENT_DATA = new AtomicBoolean(false); + private static final AtomicBoolean CLIENT_SENT_DATA = new AtomicBoolean(false); private final Http2Client webClient; Continue100Test(WebServer server) { @@ -83,7 +83,7 @@ static void router(HttpRouting.Builder router) { } catch (InterruptedException e) { LOGGER.log(System.Logger.Level.INFO, "100 test interrupted", e); } - if (CLIEN_SENT_DATA.get()) { + if (CLIENT_SENT_DATA.get()) { res.send("Client didn't wait for server's 100 continue!"); return; } @@ -97,7 +97,7 @@ static void router(HttpRouting.Builder router) { @BeforeEach void beforeEach() { - CLIEN_SENT_DATA.set(false); + CLIENT_SENT_DATA.set(false); } @Test @@ -115,7 +115,7 @@ void vertxClient(WebServer server) throws ExecutionException, InterruptedExcepti request.continueHandler(v -> { // OK to send rest of body request.putHeader(HeaderNames.CONTENT_LENGTH.defaultCase(), String.valueOf(DATA.length())); - CLIEN_SENT_DATA.set(true); + CLIENT_SENT_DATA.set(true); request.write(DATA); request.end(); }); @@ -142,7 +142,7 @@ void vertxClientTeapot(WebServer server) throws ExecutionException, InterruptedE request.continueHandler(v -> { // OK to send rest of body request.putHeader(HeaderNames.CONTENT_LENGTH.defaultCase(), String.valueOf(DATA.length())); - CLIEN_SENT_DATA.set(true); + CLIENT_SENT_DATA.set(true); request.write(DATA); request.end(); }); @@ -159,7 +159,7 @@ void webclientExpect() { .post("/100-continue") .header(HeaderValues.EXPECT_100) .outputStream(out -> { - CLIEN_SENT_DATA.set(true); + CLIENT_SENT_DATA.set(true); out.write(DATA.getBytes()); out.close(); })) { @@ -187,7 +187,7 @@ void webclientTeapot() { .post("/100-continue-not") .header(HeaderValues.EXPECT_100) .outputStream(out -> { - CLIEN_SENT_DATA.set(true); + CLIENT_SENT_DATA.set(true); out.write(DATA.getBytes()); out.close(); })) { diff --git a/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java b/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersServerTest.java similarity index 99% rename from webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java rename to webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersServerTest.java index 5389188b60a..57c35c2bbb8 100644 --- a/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersTest.java +++ b/webserver/tests/http2/src/test/java/io/helidon/webserver/tests/http2/HeadersServerTest.java @@ -58,7 +58,7 @@ import static org.hamcrest.MatcherAssert.assertThat; @ServerTest -public class HeadersTest { +public class HeadersServerTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DATA = "Helidon!!!".repeat(10); @@ -143,7 +143,7 @@ static void router(HttpRouting.Builder router) { )); } - HeadersTest(WebServer server) { + HeadersServerTest(WebServer server) { client = Http2Client.builder() .baseUri("http://localhost:" + server.port()) .protocolConfig(Http2ClientProtocolConfig.builder().priorKnowledge(true).build()) diff --git a/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java b/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java similarity index 99% rename from webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java rename to webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java index 320a23653fd..cded84d1577 100644 --- a/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersTest.java +++ b/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java @@ -40,7 +40,7 @@ import static org.hamcrest.Matchers.is; @ServerTest -class HeadersTest { +class HeadersServerTest { private static final String DATA = "Helidon!!!".repeat(10); private static final Header TEST_TRAILER_HEADER = HeaderValues.create("test-trailer", "trailer-value"); diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java index 5c294227b03..53e9602afa7 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponse.java @@ -28,6 +28,7 @@ import io.helidon.http.HeaderValues; import io.helidon.http.NotFoundException; import io.helidon.http.ServerResponseHeaders; +import io.helidon.http.ServerResponseTrailers; import io.helidon.http.Status; import io.helidon.webserver.http.spi.Sink; @@ -196,7 +197,7 @@ default void send(Optional entity) { * @throws java.lang.IllegalStateException if client didn't ask for trailers with {@code TE: trailers} header in request * or response doesn't contain trailer declaration headers {@code Trailer: } */ - ServerResponseHeaders trailers(); + ServerResponseTrailers trailers(); /** * Description of the result of output stream processing. diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java index 1465cdc2cc9..333dcfe61cf 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java @@ -92,6 +92,9 @@ protected ServerResponseBase(ConnectionContext ctx, ServerRequest request) { @Override public T status(Status status) { + if (isSent()) { + throw new IllegalStateException("Response already sent"); + } this.status = status; return (T) this; } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java index b3e0c591451..8a6fb5e89f1 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java @@ -39,6 +39,7 @@ import io.helidon.http.HeaderValues; import io.helidon.http.HttpException; import io.helidon.http.ServerResponseHeaders; +import io.helidon.http.ServerResponseTrailers; import io.helidon.http.Status; import io.helidon.http.WritableHeaders; import io.helidon.http.media.EntityWriter; @@ -68,7 +69,7 @@ class Http1ServerResponse extends ServerResponseBase { private final DataWriter dataWriter; private final Http1ServerRequest request; private final ServerResponseHeaders headers; - private final ServerResponseHeaders trailers; + private final ServerResponseTrailers trailers; private final boolean keepAlive; private boolean streamingEntity; @@ -93,7 +94,7 @@ class Http1ServerResponse extends ServerResponseBase { this.dataWriter = dataWriter; this.request = request; this.headers = ServerResponseHeaders.create(); - this.trailers = ServerResponseHeaders.create(); + this.trailers = ServerResponseTrailers.create(); this.keepAlive = keepAlive; this.validateHeaders = validateHeaders; } @@ -218,7 +219,7 @@ public ServerResponseHeaders headers() { } @Override - public ServerResponseHeaders trailers() { + public ServerResponseTrailers trailers() { if (request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER)) { return trailers; } From 3c5eb1acddf14c0b23fed75079eddb9afdd1fdf4 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Fri, 22 Sep 2023 23:32:04 +0200 Subject: [PATCH 3/4] Review issues 2 --- .../webclient/http2/Http2CallEntityChain.java | 2 +- .../http2/Http2CallOutputStreamChain.java | 2 +- .../webclient/http2/Http2ClientStream.java | 4 +- .../webserver/http2/Http2ServerResponse.java | 120 +++--------------- .../webserver/http2/Http2ServerStream.java | 94 +++++++++++++- 5 files changed, 108 insertions(+), 114 deletions(-) diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallEntityChain.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallEntityChain.java index f93ca4ef7f7..6cdc7c0ccc0 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallEntityChain.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallEntityChain.java @@ -62,7 +62,7 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ Http2Headers http2Headers = prepareHeaders(serviceRequest.method(), headers, uri); - stream.write(http2Headers, entityBytes.length == 0); + stream.writeHeaders(http2Headers, entityBytes.length == 0); stream.flowControl().inbound().incrementWindowSize(clientRequest().requestPrefetch()); whenSent.complete(serviceRequest); diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallOutputStreamChain.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallOutputStreamChain.java index 060ada9db5a..af716d59598 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallOutputStreamChain.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2CallOutputStreamChain.java @@ -53,7 +53,7 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ ClientUri uri = serviceRequest.uri(); Http2Headers http2Headers = prepareHeaders(serviceRequest.method(), headers, uri); - stream.write(http2Headers, false); + stream.writeHeaders(http2Headers, false); whenSent.complete(serviceRequest); stream.waitFor100Continue(); diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java index fe0d4523273..63cec5feca7 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java @@ -69,6 +69,7 @@ class Http2ClientStream implements Http2Stream, ReleasableResource { private final Http2FrameListener recvListener = new Http2LoggingFrameListener("cl-recv"); private final Http2Settings settings = Http2Settings.create(); private final List continuationData = new ArrayList<>(); + private final CompletableFuture trailers = new CompletableFuture<>(); private Http2StreamState state = Http2StreamState.IDLE; private ReadState readState = ReadState.INIT; @@ -80,7 +81,6 @@ class Http2ClientStream implements Http2Stream, ReleasableResource { // streamId and buffer can only be created when we are locked in the stream id sequence private int streamId; private StreamBuffer buffer; - private final CompletableFuture trailers = new CompletableFuture<>(); Http2ClientStream(Http2ClientConnection connection, Http2Settings serverSettings, @@ -253,7 +253,7 @@ void waitFor100Continue() { } } - void write(Http2Headers http2Headers, boolean endOfStream) { + void writeHeaders(Http2Headers http2Headers, boolean endOfStream) { this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true); this.readState = readState.check(http2Headers.httpHeaders().contains(HeaderValues.EXPECT_100) ? ReadState.CONTINUE_100_HEADERS diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java index aaa708e927e..ff5dbd0e9dd 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java @@ -30,14 +30,7 @@ import io.helidon.http.ServerResponseHeaders; import io.helidon.http.ServerResponseTrailers; import io.helidon.http.Status; -import io.helidon.http.http2.FlowControl; -import io.helidon.http.http2.Http2Flag; -import io.helidon.http.http2.Http2Flag.DataFlags; -import io.helidon.http.http2.Http2FrameData; -import io.helidon.http.http2.Http2FrameHeader; -import io.helidon.http.http2.Http2FrameTypes; import io.helidon.http.http2.Http2Headers; -import io.helidon.http.http2.Http2StreamWriter; import io.helidon.webserver.ConnectionContext; import io.helidon.webserver.http.ServerResponseBase; @@ -45,12 +38,10 @@ class Http2ServerResponse extends ServerResponseBase { private static final System.Logger LOGGER = System.getLogger(Http2ServerResponse.class.getName()); private final ConnectionContext ctx; - private final Http2StreamWriter writer; - private final int streamId; private final ServerResponseHeaders headers; private final ServerResponseTrailers trailers; - private final FlowControl.Outbound flowControl; private final Http2ServerRequest request; + private final Http2ServerStream stream; private boolean isSent; private boolean streamingEntity; @@ -59,17 +50,12 @@ class Http2ServerResponse extends ServerResponseBase { private UnaryOperator outputStreamFilter; private String streamResult = null; - Http2ServerResponse(ConnectionContext ctx, - Http2ServerRequest request, - Http2StreamWriter writer, - int streamId, - FlowControl.Outbound flowControl) { - super(ctx, request); - this.ctx = ctx; + Http2ServerResponse(Http2ServerStream stream, + Http2ServerRequest request) { + super(stream.connectionContext(), request); + this.ctx = stream.connectionContext(); this.request = request; - this.writer = writer; - this.streamId = streamId; - this.flowControl = flowControl; + this.stream = stream; this.headers = ServerResponseHeaders.create(); this.trailers = ServerResponseTrailers.create(); } @@ -119,26 +105,11 @@ public void send(byte[] entityBytes) { boolean sendTrailers = request.headers().contains(HeaderValues.TE_TRAILERS) || headers.contains(HeaderNames.TRAILER); - Http2FrameData frameData = - new Http2FrameData(Http2FrameHeader.create(bytes.length, - Http2FrameTypes.DATA, - DataFlags.create(sendTrailers ? 0 : Http2Flag.END_OF_STREAM), - streamId), - BufferData.create(bytes)); - http2Headers.validateResponse(); - bytesWritten = writer.writeHeaders(http2Headers, - streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), - frameData, flowControl); + bytesWritten += stream.writeHeadersWithData(http2Headers, bytes.length, BufferData.create(bytes), !sendTrailers); if (sendTrailers) { - Http2Headers http2trailers = Http2Headers.create(trailers); - int written = writer.writeHeaders(http2trailers, - streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), - flowControl); - bytesWritten += written; + bytesWritten += stream.writeTrailers(Http2Headers.create(trailers)); } afterSend(); @@ -244,12 +215,10 @@ private static class BlockingOutputStream extends OutputStream { private final Http2ServerRequest request; private final ServerResponseHeaders headers; private final ServerResponseTrailers trailers; - private final Http2StreamWriter writer; - private final int streamId; - private final FlowControl.Outbound flowControl; private final Status status; private final Runnable responseCloseRunnable; private final Http2ServerResponse response; + private final Http2ServerStream stream; private BufferData firstBuffer; private boolean closed; @@ -263,9 +232,7 @@ private BlockingOutputStream(Http2ServerRequest request, this.response = response; this.headers = response.headers; this.trailers = response.trailers; - this.writer = response.writer; - this.streamId = response.streamId; - this.flowControl = response.flowControl; + this.stream = response.stream; this.status = response.status(); this.responseCloseRunnable = responseCloseRunnable; } @@ -309,7 +276,7 @@ void commit() { } else if (sendTrailers) { sendTrailers(); } else { - sendEndOfStream(); + bytesWritten += stream.writeData(BufferData.empty(), true); } responseCloseRunnable.run(); try { @@ -332,9 +299,9 @@ private void write(BufferData buffer) throws IOException { if (firstByte) { sendHeadersAndPrepare(); firstByte = false; - writeChunk(BufferData.create(firstBuffer, buffer)); + bytesWritten += stream.writeData(BufferData.create(firstBuffer, buffer), false); } else { - writeChunk(buffer); + bytesWritten += stream.writeData(buffer, false); } } @@ -358,29 +325,9 @@ private void sendFirstChunkOnly(boolean sendTrailers) { // at this moment, we must send headers if (contentLength == 0) { - int written = writer.writeHeaders(http2Headers, - streamId, - Http2Flag.HeaderFlags.create( - sendTrailers - ? Http2Flag.END_OF_HEADERS - : Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), - flowControl); - bytesWritten += written; + bytesWritten += stream.writeHeaders(http2Headers, !sendTrailers); } else { - Http2FrameData frameData = - new Http2FrameData(Http2FrameHeader.create(contentLength, - Http2FrameTypes.DATA, - DataFlags.create(sendTrailers - ? 0 - : Http2Flag.END_OF_STREAM), - streamId), - firstBuffer); - int written = writer.writeHeaders(http2Headers, - streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), - frameData, flowControl); - - bytesWritten += written; + bytesWritten += stream.writeHeadersWithData(http2Headers, contentLength, firstBuffer, !sendTrailers); } } @@ -390,36 +337,8 @@ private void sendHeadersAndPrepare() { Http2Headers http2Headers = Http2Headers.create(headers); http2Headers.status(status); http2Headers.validateResponse(); - int written = writer.writeHeaders(http2Headers, - streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), - flowControl); - - bytesWritten += written; - } - - private void writeChunk(BufferData buffer) { - Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(buffer.available(), - Http2FrameTypes.DATA, - DataFlags.create(0), - streamId), - buffer); - bytesWritten += frameData.header().length(); - bytesWritten += Http2FrameHeader.LENGTH; - - writer.writeData(frameData, flowControl); - } - - private void sendEndOfStream() { - Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(0, - Http2FrameTypes.DATA, - DataFlags.create(Http2Flag.END_OF_STREAM), - streamId), - BufferData.empty()); - bytesWritten += frameData.header().length(); - bytesWritten += Http2FrameHeader.LENGTH; - writer.writeData(frameData, flowControl); + bytesWritten += stream.writeHeaders(http2Headers, false); } private void sendTrailers(){ @@ -429,12 +348,7 @@ private void sendTrailers(){ trailers.set(STREAM_STATUS_NAME, response.status().code()); Http2Headers http2Headers = Http2Headers.create(trailers); - int written = writer.writeHeaders(http2Headers, - streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS - | Http2Flag.END_OF_STREAM), - flowControl); - bytesWritten += written; + bytesWritten += stream.writeTrailers(http2Headers); } } } diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java index 739af65a914..6621b18b242 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java @@ -21,7 +21,6 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; import io.helidon.common.buffers.BufferData; import io.helidon.common.socket.SocketWriterException; @@ -86,13 +85,13 @@ public class Http2ServerStream implements Runnable, Http2Stream { private final Router router; private final ArrayBlockingQueue inboundData = new ArrayBlockingQueue<>(32); private final StreamFlowControl flowControl; - private final AtomicBoolean send100Continue = new AtomicBoolean(false); private boolean wasLastDataFrame = false; private volatile Http2Headers headers; private volatile Http2Priority priority; // used from this instance and from connection private volatile Http2StreamState state = Http2StreamState.IDLE; + private WriteState writeState = WriteState.INIT; private Http2SubProtocolSelector.SubProtocolHandler subProtocolHandler; private long expectedLength = -1; private HttpRouting routing; @@ -329,8 +328,61 @@ public void run() { } } - void send100Continue() { - if (send100Continue.getAndSet(false)) { + int writeHeaders(Http2Headers http2Headers, boolean endOfStream) { + writeState = writeState.check(endOfStream ? WriteState.END : WriteState.HEADERS_SENT); + + Http2Flag.HeaderFlags flags; + if (endOfStream) { + flags = Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM); + } else { + flags = Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS); + } + return writer.writeHeaders(http2Headers, streamId, flags, flowControl.outbound()); + } + + int writeHeadersWithData(Http2Headers http2Headers, int contentLength, BufferData bufferData, boolean endOfStream) { + writeState = writeState.check(WriteState.HEADERS_SENT); + writeState = writeState.check(endOfStream ? WriteState.END : WriteState.DATA_SENT); + + Http2FrameData frameData = + new Http2FrameData(Http2FrameHeader.create(contentLength, + Http2FrameTypes.DATA, + Http2Flag.DataFlags.create(endOfStream ? Http2Flag.END_OF_STREAM : 0), + streamId), + bufferData); + return writer.writeHeaders(http2Headers, streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), + frameData, + flowControl.outbound()); + } + + int writeData(BufferData bufferData, boolean endOfStream) { + writeState = writeState.check(endOfStream ? WriteState.END : WriteState.DATA_SENT); + + Http2FrameData frameData = + new Http2FrameData(Http2FrameHeader.create(bufferData.available(), + Http2FrameTypes.DATA, + Http2Flag.DataFlags.create(endOfStream ? Http2Flag.END_OF_STREAM : 0), + streamId), + bufferData); + + writer.writeData(frameData, flowControl.outbound()); + return frameData.header().length() + Http2FrameHeader.LENGTH; + } + + int writeTrailers(Http2Headers http2trailers) { + writeState = writeState.check(WriteState.TRAILERS_SENT); + + return writer.writeHeaders(http2trailers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), + flowControl.outbound()); + } + + void write100Continue() { + if (writeState == WriteState.EXPECTED_100) { + writeState = writeState.check(WriteState.CONTINUE_100_SENT); + Header status = HeaderValues.createCached(Http2Headers.STATUS_NAME, 100); Http2Headers http2Headers = Http2Headers.create(WritableHeaders.create().add(status)); writer.writeHeaders(http2Headers, @@ -348,8 +400,12 @@ void prologue(HttpPrologue prologue) { this.prologue = prologue; } + ConnectionContext connectionContext() { + return this.ctx; + } + private BufferData readEntityFromPipeline() { - send100Continue(); + write100Continue(); if (wasLastDataFrame) { return BufferData.empty(); } @@ -375,7 +431,7 @@ private void handle() { this.expectedLength = httpHeaders.get(HeaderNames.CONTENT_LENGTH).get(long.class); } if (headers.httpHeaders().contains(HeaderValues.EXPECT_100)) { - this.send100Continue.set(true); + writeState = writeState.check(WriteState.EXPECTED_100); } subProtocolHandler = null; @@ -426,7 +482,7 @@ private void handle() { decoder, streamId, this::readEntityFromPipeline); - Http2ServerResponse response = new Http2ServerResponse(ctx, request, writer, streamId, flowControl.outbound()); + Http2ServerResponse response = new Http2ServerResponse(this, request); semaphoreAcquired = requestSemaphore.tryAcquire(); try { if (semaphoreAcquired) { @@ -438,6 +494,7 @@ private void handle() { response.commit(); } } finally { + request.content().consume(); this.state = Http2StreamState.CLOSED; } } else { @@ -461,4 +518,27 @@ private void handle() { } private record DataFrame(Http2FrameHeader header, BufferData data) { } + + private enum WriteState { + END, + TRAILERS_SENT(END), + DATA_SENT(TRAILERS_SENT, END), + HEADERS_SENT(DATA_SENT, TRAILERS_SENT, END), + CONTINUE_100_SENT(HEADERS_SENT), + EXPECTED_100(CONTINUE_100_SENT, HEADERS_SENT), + INIT(EXPECTED_100, HEADERS_SENT); + + private final Set allowedTransitions; + + WriteState(WriteState... allowedTransitions){ + this.allowedTransitions = Set.of(allowedTransitions); + } + + WriteState check(WriteState newState) { + if (this == newState || allowedTransitions.contains(newState)) { + return newState; + } + throw new IllegalStateException("Transition from " + this + " to " + newState + " is not allowed!"); + } + } } From f70302a9c151d23e20100199f729465f7d55c791 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Mon, 25 Sep 2023 16:13:44 +0200 Subject: [PATCH 4/4] Remove stream status --- .../webserver/http2/Http2ServerResponse.java | 1 - .../webserver/tests/HeadersServerTest.java | 26 ------------------- .../webserver/http/ServerResponseBase.java | 7 +---- .../webserver/http1/Http1ServerResponse.java | 1 - 4 files changed, 1 insertion(+), 34 deletions(-) diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java index ff5dbd0e9dd..03ed6e48bd9 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerResponse.java @@ -345,7 +345,6 @@ private void sendTrailers(){ if (response.streamResult != null) { trailers.set(STREAM_RESULT_NAME, response.streamResult); } - trailers.set(STREAM_STATUS_NAME, response.status().code()); Http2Headers http2Headers = Http2Headers.create(trailers); bytesWritten += stream.writeTrailers(http2Headers); diff --git a/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java b/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java index cded84d1577..ac708821857 100644 --- a/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java +++ b/webserver/tests/webserver/src/test/java/io/helidon/webserver/tests/HeadersServerTest.java @@ -76,18 +76,6 @@ static void router(HttpRouting.Builder router) { } } ); - router.route(GET, "/stream-status", - (req, res) -> { - res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); - try (var os = res.outputStream()) { - os.write(DATA.getBytes()); - os.write(DATA.getBytes()); - os.flush(); - res.status(Status.I_AM_A_TEAPOT_418); - os.write(DATA.getBytes()); - } - } - ); router.route(GET, "/trailers-forced", (req, res) -> { res.header(HeaderNames.TRAILER, TEST_TRAILER_HEADER.name()); @@ -145,24 +133,10 @@ void streamResult(WebClient client) throws IOException { try (var ins = res.entity()) { assertThat(ins.readAllBytes(), is(DATA.repeat(3).getBytes())); } - assertThat(res.trailers(), hasHeader(HeaderValues.create("Stream-Status", 200))); assertThat(res.trailers(), hasHeader(HeaderValues.create("Stream-Result", "Kaboom!"))); checkCachedConnection(res.headers()); } - @Test - void streamStatus(WebClient client) throws IOException { - ClientResponseTyped res = client - .get("/stream-status") - .header(HeaderValues.create("TE", "trailers")) - .request(InputStream.class); - try (var ins = res.entity()) { - ins.readAllBytes(); - } - assertThat(res.trailers(), hasHeader(HeaderValues.create("Stream-Status", Status.I_AM_A_TEAPOT_418.code()))); - checkCachedConnection(res.headers()); - } - @Test void trailersNoTrailers(WebClient client) { ClientResponseTyped res = client diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java index 333dcfe61cf..2fde2eb24eb 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/ServerResponseBase.java @@ -51,10 +51,6 @@ @SuppressWarnings("unchecked") public abstract class ServerResponseBase> implements RoutingResponse { - /** - * Stream status trailer name. - */ - protected static final HeaderName STREAM_STATUS_NAME = HeaderNames.create("stream-status"); /** * Stream result trailer name. */ @@ -63,8 +59,7 @@ public abstract class ServerResponseBase> implem * Stream status trailers. */ protected static final Header STREAM_TRAILERS = - HeaderValues.create(HeaderNames.TRAILER, STREAM_STATUS_NAME.defaultCase() - + "," + STREAM_RESULT_NAME.defaultCase()); + HeaderValues.create(HeaderNames.TRAILER, STREAM_RESULT_NAME.defaultCase()); private final ContentEncodingContext contentEncodingContext; private final MediaContext mediaContext; private final ServerRequestHeaders requestHeaders; diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java index 8a6fb5e89f1..5bf8ca2d18e 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java @@ -496,7 +496,6 @@ void commit() { if (sendTrailers) { // not optimized, trailers enabled: we need to write trailers - trailers.set(STREAM_STATUS_NAME, String.valueOf(status.get().code())); trailers.set(STREAM_RESULT_NAME, streamResult.get()); BufferData buffer = BufferData.growing(128); writeHeaders(trailers, buffer, this.validateHeaders);