diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java index 4916af47c88..3f616060f3d 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java @@ -411,15 +411,11 @@ public int read() { if (finished) { return -1; } - try { - ensureBuffer(512); - if (finished || currentBuffer == null) { - return -1; - } - return currentBuffer.read(); - } catch (DataReader.InsufficientDataAvailableException e) { + ensureBuffer(512); + if (finished || currentBuffer == null) { return -1; } + return currentBuffer.read(); } @Override @@ -427,15 +423,11 @@ public int read(byte[] b, int off, int len) { if (finished) { return -1; } - try { - ensureBuffer(len); - if (finished || currentBuffer == null) { - return -1; - } - return currentBuffer.read(b, off, len); - } catch (DataReader.InsufficientDataAvailableException e) { + ensureBuffer(len); + if (finished || currentBuffer == null) { return -1; } + return currentBuffer.read(b, off, len); } private void ensureBuffer(int estimate) { diff --git a/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java b/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java index 502bb1f8989..e8758e16977 100644 --- a/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java +++ b/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java @@ -25,6 +25,7 @@ import java.time.Duration; import io.helidon.common.GenericType; +import io.helidon.common.buffers.DataReader; import io.helidon.common.media.type.MediaTypes; import io.helidon.http.media.MediaContext; import io.helidon.http.sse.SseEvent; @@ -93,6 +94,9 @@ public > void handle(X source, HttpClientResponse res } } + source.onClose(); + } catch (DataReader.InsufficientDataAvailableException e) { + // normal SSE termination when connection closed by server source.onClose(); } catch (IOException e) { source.onError(e); diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/DataWriterSseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/DataWriterSseSink.java new file mode 100644 index 00000000000..5efaa1d7c1b --- /dev/null +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/DataWriterSseSink.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.sse; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import io.helidon.common.GenericType; +import io.helidon.common.buffers.BufferData; +import io.helidon.common.media.type.MediaType; +import io.helidon.common.media.type.MediaTypes; +import io.helidon.http.DateTime; +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; +import io.helidon.http.HttpMediaType; +import io.helidon.http.ServerResponseHeaders; +import io.helidon.http.Status; +import io.helidon.http.WritableHeaders; +import io.helidon.http.media.EntityWriter; +import io.helidon.http.media.MediaContext; +import io.helidon.http.sse.SseEvent; +import io.helidon.webserver.ConnectionContext; +import io.helidon.webserver.ServerConnectionException; +import io.helidon.webserver.http.ServerResponse; +import io.helidon.webserver.http.spi.SinkProviderContext; + +import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; +import static io.helidon.http.HeaderValues.create; + +/** + * Implementation of an SSE sink. Emits {@link SseEvent}s. + */ +public class DataWriterSseSink implements SseSink { + + /** + * Type of SSE event sinks. + */ + public static final GenericType TYPE = GenericType.create(DataWriterSseSink.class); + + private static final Header CACHE_NO_CACHE_ONLY = create(HeaderNames.CACHE_CONTROL, "no-cache"); + private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); + private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8); + private static final WritableHeaders EMPTY_HEADERS = WritableHeaders.create(); + + private final ServerResponse response; + private final ConnectionContext ctx; + private final MediaContext mediaContext; + private final Runnable closeRunnable; + + DataWriterSseSink(SinkProviderContext context) { + this.response = context.serverResponse(); + this.ctx = context.connectionContext(); + this.mediaContext = ctx.listenerContext().mediaContext(); + this.closeRunnable = context.closeRunnable(); + writeStatusAndHeaders(); + } + + @Override + public DataWriterSseSink emit(SseEvent sseEvent) { + BufferData bufferData = BufferData.growing(512); + + Optional comment = sseEvent.comment(); + if (comment.isPresent()) { + bufferData.write(SSE_COMMENT); + bufferData.write(comment.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Optional id = sseEvent.id(); + if (id.isPresent()) { + bufferData.write(SSE_ID); + bufferData.write(id.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Optional name = sseEvent.name(); + if (name.isPresent()) { + bufferData.write(SSE_EVENT); + bufferData.write(name.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Object data = sseEvent.data(); + if (data != null) { + bufferData.write(SSE_DATA); + byte[] bytes = serializeData(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); + bufferData.write(bytes); + bufferData.write(SSE_NL); + } + bufferData.write(SSE_NL); + + // write event to the network + ctx.dataWriter().writeNow(bufferData); + return this; + } + + @Override + public void close() { + closeRunnable.run(); + ctx.serverSocket().close(); + } + + void writeStatusAndHeaders() { + ServerResponseHeaders headers = response.headers(); + + // verify response has no status or content type + HttpMediaType ct = headers.contentType().orElse(null); + if (response.status().code() != Status.OK_200.code() + || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { + throw new IllegalStateException("ServerResponse instance cannot be used to create SseSink"); + } + + // start writing status line + BufferData buffer = BufferData.growing(256); + buffer.write(OK_200); + + // serialize a date header if not included + if (!headers.contains(HeaderNames.DATE)) { + buffer.write(DATE); + byte[] dateBytes = DateTime.http1Bytes(); + buffer.write(dateBytes); + } + + // set up and write headers + if (ct == null) { + headers.add(CONTENT_TYPE_EVENT_STREAM); + } + headers.set(CACHE_NO_CACHE_ONLY); + for (Header header : headers) { + header.writeHttp1Header(buffer); + } + + // complete heading + buffer.write('\r'); // "\r\n" - empty line after headers + buffer.write('\n'); + + // write response heading to the network + ctx.dataWriter().writeNow(buffer); + } + + private byte[] serializeData(Object object, MediaType mediaType) { + if (object instanceof byte[] bytes) { + return bytes; + } else if (mediaContext != null) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + if (object instanceof String str && mediaType.equals(MediaTypes.TEXT_PLAIN)) { + EntityWriter writer = mediaContext.writer(GenericType.STRING, EMPTY_HEADERS, EMPTY_HEADERS); + writer.write(GenericType.STRING, str, baos, EMPTY_HEADERS, EMPTY_HEADERS); + } else { + GenericType type = GenericType.create(object); + WritableHeaders resHeaders = WritableHeaders.create(); + resHeaders.set(HeaderNames.CONTENT_TYPE, mediaType.text()); + EntityWriter writer = mediaContext.writer(type, EMPTY_HEADERS, resHeaders); + writer.write(type, object, baos, EMPTY_HEADERS, resHeaders); + } + return baos.toByteArray(); + } catch (IOException e) { + throw new ServerConnectionException("Failed to write SSE event", e); + + } + } + throw new IllegalStateException("Unable to serialize SSE event without a media context"); + } +} diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/OutputStreamSseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/OutputStreamSseSink.java new file mode 100644 index 00000000000..67e1c867247 --- /dev/null +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/OutputStreamSseSink.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.function.BiConsumer; + +import io.helidon.common.GenericType; +import io.helidon.common.media.type.MediaType; +import io.helidon.common.media.type.MediaTypes; +import io.helidon.http.HttpMediaType; +import io.helidon.http.Status; +import io.helidon.http.sse.SseEvent; +import io.helidon.webserver.http.ServerResponse; + +import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; + +/** + * Deprecated implementation of an SSE sink. Emits {@link SseEvent}s. + * + * @deprecated Replaced by {@link io.helidon.webserver.sse.DataWriterSseSink}. + */ +@Deprecated(since = "4.1.2", forRemoval = true) +public class OutputStreamSseSink implements SseSink { + + /** + * Type of SSE event sinks. + */ + public static final GenericType TYPE = GenericType.create(DataWriterSseSink.class); + + private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); + private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); + + private final BiConsumer eventConsumer; + private final Runnable closeRunnable; + private final OutputStream outputStream; + + OutputStreamSseSink(ServerResponse serverResponse, BiConsumer eventConsumer, Runnable closeRunnable) { + // Verify response has no status or content type + HttpMediaType ct = serverResponse.headers().contentType().orElse(null); + if (serverResponse.status().code() != Status.OK_200.code() + || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { + throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse"); + } + + // Ensure content type set for SSE + if (ct == null) { + serverResponse.headers().add(CONTENT_TYPE_EVENT_STREAM); + } + + this.outputStream = serverResponse.outputStream(); + this.eventConsumer = eventConsumer; + this.closeRunnable = closeRunnable; + } + + @Override + public OutputStreamSseSink emit(SseEvent sseEvent) { + try { + Optional comment = sseEvent.comment(); + if (comment.isPresent()) { + outputStream.write(SSE_COMMENT); + outputStream.write(comment.get().getBytes(StandardCharsets.UTF_8)); + outputStream.write(SSE_NL); + } + Optional id = sseEvent.id(); + if (id.isPresent()) { + outputStream.write(SSE_ID); + outputStream.write(id.get().getBytes(StandardCharsets.UTF_8)); + outputStream.write(SSE_NL); + } + Optional name = sseEvent.name(); + if (name.isPresent()) { + outputStream.write(SSE_EVENT); + outputStream.write(name.get().getBytes(StandardCharsets.UTF_8)); + outputStream.write(SSE_NL); + } + Object data = sseEvent.data(); + if (data != SseEvent.NO_DATA) { + outputStream.write(SSE_DATA); + eventConsumer.accept(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); + outputStream.write(SSE_NL); + } + outputStream.write(SSE_NL); + outputStream.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public void close() { + closeRunnable.run(); + } +} diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java index 32fbdbd2a39..aa89e5110c7 100644 --- a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, 2024 Oracle and/or its affiliates. + * Copyright (c) 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,171 +13,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.helidon.webserver.sse; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Optional; - import io.helidon.common.GenericType; -import io.helidon.common.buffers.BufferData; -import io.helidon.common.media.type.MediaType; -import io.helidon.common.media.type.MediaTypes; -import io.helidon.http.DateTime; -import io.helidon.http.Header; -import io.helidon.http.HeaderNames; -import io.helidon.http.HttpMediaType; -import io.helidon.http.ServerResponseHeaders; -import io.helidon.http.Status; -import io.helidon.http.WritableHeaders; -import io.helidon.http.media.EntityWriter; -import io.helidon.http.media.MediaContext; import io.helidon.http.sse.SseEvent; -import io.helidon.webserver.ConnectionContext; -import io.helidon.webserver.ServerConnectionException; -import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.spi.Sink; -import io.helidon.webserver.http.spi.SinkProviderContext; - -import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; -import static io.helidon.http.HeaderValues.create; /** - * Implementation of an SSE sink. Emits {@link SseEvent}s. + * A sink for SSE events. */ -public class SseSink implements Sink { +public interface SseSink extends Sink { /** * Type of SSE event sinks. */ - public static final GenericType TYPE = GenericType.create(SseSink.class); - - private static final Header CACHE_NO_CACHE_ONLY = create(HeaderNames.CACHE_CONTROL, "no-cache"); - private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); - private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); - private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8); - private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8); - private static final WritableHeaders EMPTY_HEADERS = WritableHeaders.create(); - - private final ServerResponse response; - private final ConnectionContext ctx; - private final MediaContext mediaContext; - private final Runnable closeRunnable; - - SseSink(SinkProviderContext context) { - this.response = context.serverResponse(); - this.ctx = context.connectionContext(); - this.mediaContext = ctx.listenerContext().mediaContext(); - this.closeRunnable = context.closeRunnable(); - writeStatusAndHeaders(); - } + GenericType TYPE = GenericType.create(SseSink.class); + /** + * Emits an event using to the sink. + * + * @param event the event to emit + * @return this sink + */ @Override - public SseSink emit(SseEvent sseEvent) { - BufferData bufferData = BufferData.growing(512); - - Optional comment = sseEvent.comment(); - if (comment.isPresent()) { - bufferData.write(SSE_COMMENT); - bufferData.write(comment.get().getBytes(StandardCharsets.UTF_8)); - bufferData.write(SSE_NL); - } - Optional id = sseEvent.id(); - if (id.isPresent()) { - bufferData.write(SSE_ID); - bufferData.write(id.get().getBytes(StandardCharsets.UTF_8)); - bufferData.write(SSE_NL); - } - Optional name = sseEvent.name(); - if (name.isPresent()) { - bufferData.write(SSE_EVENT); - bufferData.write(name.get().getBytes(StandardCharsets.UTF_8)); - bufferData.write(SSE_NL); - } - Object data = sseEvent.data(); - if (data != null) { - bufferData.write(SSE_DATA); - byte[] bytes = serializeData(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); - bufferData.write(bytes); - bufferData.write(SSE_NL); - } - bufferData.write(SSE_NL); - - // write event to the network - ctx.dataWriter().writeNow(bufferData); - return this; - } + SseSink emit(SseEvent event); + /** + * Close SSE sink. + */ @Override - public void close() { - closeRunnable.run(); - ctx.serverSocket().close(); - } - - void writeStatusAndHeaders() { - ServerResponseHeaders headers = response.headers(); - - // verify response has no status or content type - HttpMediaType ct = headers.contentType().orElse(null); - if (response.status().code() != Status.OK_200.code() - || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { - throw new IllegalStateException("ServerResponse instance cannot be used to create SseSink"); - } - - // start writing status line - BufferData buffer = BufferData.growing(256); - buffer.write(OK_200); - - // serialize a date header if not included - if (!headers.contains(HeaderNames.DATE)) { - buffer.write(DATE); - byte[] dateBytes = DateTime.http1Bytes(); - buffer.write(dateBytes); - } - - // set up and write headers - if (ct == null) { - headers.add(CONTENT_TYPE_EVENT_STREAM); - } - headers.add(CACHE_NO_CACHE_ONLY); - for (Header header : headers) { - header.writeHttp1Header(buffer); - } - - // complete heading - buffer.write('\r'); // "\r\n" - empty line after headers - buffer.write('\n'); - - // write response heading to the network - ctx.dataWriter().writeNow(buffer); - } - - private byte[] serializeData(Object object, MediaType mediaType) { - if (object instanceof byte[] bytes) { - return bytes; - } else if (mediaContext != null) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - if (object instanceof String str && mediaType.equals(MediaTypes.TEXT_PLAIN)) { - EntityWriter writer = mediaContext.writer(GenericType.STRING, EMPTY_HEADERS, EMPTY_HEADERS); - writer.write(GenericType.STRING, str, baos, EMPTY_HEADERS, EMPTY_HEADERS); - } else { - GenericType type = GenericType.create(object); - WritableHeaders resHeaders = WritableHeaders.create(); - resHeaders.set(HeaderNames.CONTENT_TYPE, mediaType.text()); - EntityWriter writer = mediaContext.writer(type, EMPTY_HEADERS, resHeaders); - writer.write(type, object, baos, EMPTY_HEADERS, resHeaders); - } - return baos.toByteArray(); - } catch (IOException e) { - throw new ServerConnectionException("Failed to write SSE event", e); - - } - } - throw new IllegalStateException("Unable to serialize SSE event without a media context"); - } + void close(); } diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java index e0b37e94208..f83625fc37f 100644 --- a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java @@ -50,7 +50,7 @@ public boolean supports(GenericType> type, ServerRequest reque @Override @SuppressWarnings("unchecked") public > X create(SinkProviderContext context) { - return (X) new SseSink(context); + return (X) new DataWriterSseSink(context); } /** @@ -64,10 +64,10 @@ public > X create(SinkProviderContext context) { * @deprecated replaced by {@link #create(SinkProviderContext)} */ @Override + @Deprecated(since = "4.1.2", forRemoval = true) public > X create(ServerResponse response, BiConsumer eventConsumer, Runnable closeRunnable) { - throw new UnsupportedOperationException("Deprecated, use other create method in class"); + return (X) new OutputStreamSseSink(response, eventConsumer, closeRunnable); } - } diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java index c25552ecb46..8254f8e4e51 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseBaseTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,14 +17,34 @@ package io.helidon.webserver.tests.sse; import io.helidon.http.sse.SseEvent; -import io.helidon.webserver.sse.SseSink; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.ServerRequest; import io.helidon.webserver.http.ServerResponse; +import io.helidon.webserver.sse.SseSink; + import jakarta.json.Json; import jakarta.json.JsonObject; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + class SseBaseTest { + private final WebServer webServer; + + SseBaseTest() { + this.webServer = null; + } + + SseBaseTest(WebServer webServer) { + this.webServer = webServer; + } + + protected WebServer webServer() { + return webServer; + } + static void sseString1(ServerRequest req, ServerResponse res) { try (SseSink sseSink = res.sink(SseSink.TYPE)) { sseSink.emit(SseEvent.create("hello")) @@ -96,4 +116,14 @@ static void sseIdComment(ServerRequest req, ServerResponse res) { sseSink.emit(event); } } + + protected void testSse(String path, String... events) throws Exception { + assert webServer != null; + try (SimpleSseClient sseClient = SimpleSseClient.create(webServer.port(), path)) { + for (String e : events) { + assertThat(sseClient.nextEvent(), is(e)); + } + assertThat(sseClient.nextEvent(), is(nullValue())); + } + } } diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java index a812b102841..f039353cca4 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import io.helidon.webclient.http1.Http1Client; import io.helidon.webclient.http1.Http1ClientResponse; import io.helidon.webclient.sse.SseSource; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.HttpRules; import io.helidon.webserver.http.ServerRequest; import io.helidon.webserver.http.ServerResponse; @@ -44,7 +45,8 @@ class SseClientTest extends SseBaseTest { private final Http1Client client; - SseClientTest(Http1Client client) { + SseClientTest(WebServer webServer, Http1Client client) { + super(webServer); this.client = client; } diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java index 0ee68a6bc8c..b22f446530d 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerMediaTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import io.helidon.common.config.Config; import io.helidon.http.Headers; import io.helidon.http.HttpMediaType; -import io.helidon.http.Status; import io.helidon.http.WritableHeaders; import io.helidon.http.media.EntityWriter; import io.helidon.http.media.MediaSupport; @@ -34,7 +33,7 @@ import io.helidon.http.media.spi.MediaSupportProvider; import io.helidon.http.sse.SseEvent; import io.helidon.webclient.http1.Http1Client; -import io.helidon.webclient.http1.Http1ClientResponse; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.HttpRules; import io.helidon.webserver.http.ServerRequest; import io.helidon.webserver.http.ServerResponse; @@ -44,23 +43,20 @@ import org.junit.jupiter.api.Test; -import static io.helidon.http.HeaderValues.ACCEPT_EVENT_STREAM; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - /** * Test that shows how to serialize an individual SSE event using a custom * {@link io.helidon.http.media.spi.MediaSupportProvider} and a user-defined * media type. Each SSE event can be given a different media type. */ @ServerTest -class SseServerMediaTest { +class SseServerMediaTest extends SseBaseTest { private static final HttpMediaType MY_PLAIN_TEXT = HttpMediaType.create("text/my_plain"); private final Http1Client client; - SseServerMediaTest(Http1Client client) { + SseServerMediaTest(WebServer webServer, Http1Client client) { + super(webServer); this.client = client; } @@ -70,8 +66,8 @@ static void routing(HttpRules rules) { } @Test - void testSseJson() { - testSse("/sse", "data:HELLO\n\ndata:world\n\n"); + void testSseJson() throws Exception { + testSse("/sse", "data:HELLO", "data:world"); } private static void sse(ServerRequest req, ServerResponse res) { @@ -81,13 +77,6 @@ private static void sse(ServerRequest req, ServerResponse res) { } } - private void testSse(String path, String result) { - try (Http1ClientResponse response = client.get(path).header(ACCEPT_EVENT_STREAM).request()) { - assertThat(response.status(), is(Status.OK_200)); - assertThat(response.as(String.class), is(result)); - } - } - @SuppressWarnings("unchecked") public static class MyStringSupport extends StringSupport { @@ -143,6 +132,10 @@ private void write(String toWrite, } } + /** + * Provider for {@link io.helidon.webserver.tests.sse.SseServerMediaTest.MyStringSupport}, + * loaded as a service. + */ public static class MyStringSupportProvider implements MediaSupportProvider, Weighted { @Override diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java index add41b56af0..81a71df14a4 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java @@ -28,16 +28,13 @@ import static io.helidon.http.HeaderValues.ACCEPT_JSON; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @ServerTest class SseServerTest extends SseBaseTest { - private final WebServer webServer; - SseServerTest(WebServer webServer) { - this.webServer = webServer; + super(webServer); } @SetUpRoute @@ -84,19 +81,10 @@ void testIdComment() throws Exception { @Test void testWrongAcceptType() { Http1Client client = Http1Client.builder() - .baseUri("http://localhost:" + webServer.port()) + .baseUri("http://localhost:" + webServer().port()) .build(); try (Http1ClientResponse response = client.get("/sseString1").header(ACCEPT_JSON).request()) { assertThat(response.status(), is(Status.NOT_ACCEPTABLE_406)); } } - - private void testSse(String path, String... events) throws Exception { - try (SimpleSseClient sseClient = SimpleSseClient.create(webServer.port(), path)) { - for (String e : events) { - assertThat(sseClient.nextEvent(), is(e)); - } - assertThat(sseClient.nextEvent(), is(nullValue())); - } - } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java index 5def0ffa764..53b1c7dd150 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java @@ -60,7 +60,7 @@ default > X create(SinkProviderContext context) { * @return newly created sink * @deprecated replaced by {@link #create(SinkProviderContext)} */ - @Deprecated(forRemoval = true, since = "4.1.2") + @Deprecated(since = "4.1.2", forRemoval = true) > X create(ServerResponse response, BiConsumer eventConsumer, Runnable closeRunnable);