diff --git a/rx-netty-contexts/src/main/java/io/reactivex/netty/contexts/ContextPipelineConfigurators.java b/rx-netty-contexts/src/main/java/io/reactivex/netty/contexts/ContextPipelineConfigurators.java index 91dbe480..835240b3 100644 --- a/rx-netty-contexts/src/main/java/io/reactivex/netty/contexts/ContextPipelineConfigurators.java +++ b/rx-netty-contexts/src/main/java/io/reactivex/netty/contexts/ContextPipelineConfigurators.java @@ -23,7 +23,7 @@ import io.reactivex.netty.protocol.http.client.HttpClientResponse; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; /** * A factory class for different {@link PipelineConfigurator} for the context module. @@ -61,12 +61,12 @@ private ContextPipelineConfigurators() { public static PipelineConfigurator, HttpClientRequest> sseClientConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator) { return new HttpClientContextConfigurator(requestIdProvider, correlator, - PipelineConfigurators.sseClientConfigurator()); + PipelineConfigurators.clientSseConfigurator()); } public static PipelineConfigurator, HttpServerResponse> sseServerConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator) { return new HttpServerContextConfigurator(requestIdProvider, correlator, - PipelineConfigurators.sseServerConfigurator()); + PipelineConfigurators.serveSseConfigurator()); } } diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogAggregator.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogAggregator.java index 2806cd91..a0e9c9f0 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogAggregator.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogAggregator.java @@ -17,7 +17,7 @@ package io.reactivex.netty.examples.http.logtail; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.logging.LogLevel; import io.reactivex.netty.RxNetty; import io.reactivex.netty.pipeline.PipelineConfigurators; import io.reactivex.netty.protocol.http.client.HttpClient; @@ -27,8 +27,9 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import rx.Observable; +import rx.functions.Action1; import rx.functions.Func1; import java.util.ArrayList; @@ -53,7 +54,7 @@ public LogAggregator(int port, int producerPortFrom, int producerPortTo) { } public HttpServer createAggregationServer() { - server = RxNetty.createHttpServer(port, + server = RxNetty.newHttpServerBuilder(port, new RequestHandler() { @Override public Observable handle(final HttpServerRequest request, @@ -62,18 +63,11 @@ public Observable handle(final HttpServerRequest request, return connectToLogProducers().flatMap(new Func1>() { @Override public Observable call(ServerSentEvent sse) { - ServerSentEvent data = new ServerSentEvent(sse.getEventId(), "data", sse.getEventData()); - return response.writeAndFlush(data); - } - }).onErrorResumeNext(new Func1>() { - @Override - public Observable call(Throwable throwable) { - response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); - return response.close(); + return response.writeAndFlush(sse); } }); } - }, PipelineConfigurators.sseServerConfigurator()); + }).enableWireLogging(LogLevel.ERROR).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator()).build(); System.out.println("Logs aggregator server started..."); return server; } @@ -88,12 +82,18 @@ private Observable connectToLogProducers() { private static Observable connectToLogProducer(int port) { HttpClient client = - RxNetty.createHttpClient("localhost", port, PipelineConfigurators.sseClientConfigurator()); + RxNetty.createHttpClient("localhost", port, PipelineConfigurators.clientSseConfigurator()); return client.submit(HttpClientRequest.createGet("/logstream")).flatMap(new Func1, Observable>() { @Override public Observable call(HttpClientResponse response) { - return response.getContent(); + return response.getContent() + .doOnNext(new Action1() { + @Override + public void call(ServerSentEvent serverSentEvent) { + serverSentEvent.retain(); + } + }); } }); } diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogProducer.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogProducer.java index 8db52a19..62c48314 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogProducer.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogProducer.java @@ -17,13 +17,14 @@ package io.reactivex.netty.examples.http.logtail; import io.netty.buffer.ByteBuf; +import io.netty.handler.logging.LogLevel; import io.reactivex.netty.RxNetty; import io.reactivex.netty.pipeline.PipelineConfigurators; import io.reactivex.netty.protocol.http.server.HttpServer; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import rx.Observable; import rx.functions.Func1; @@ -45,14 +46,16 @@ public LogProducer(int port, int interval) { } public HttpServer createServer() { - HttpServer server = RxNetty.createHttpServer(port, + HttpServer server = RxNetty.newHttpServerBuilder(port, new RequestHandler() { @Override public Observable handle(HttpServerRequest request, HttpServerResponse response) { return createReplyHandlerObservable(response); } - }, PipelineConfigurators.sseServerConfigurator()); + }).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator()) + .enableWireLogging(LogLevel.DEBUG) + .build(); System.out.println("Started log producer on port " + port); return server; } @@ -62,12 +65,10 @@ private Observable createReplyHandlerObservable(final HttpServerResponse>() { @Override public Observable call(Long interval) { - ServerSentEvent data = new ServerSentEvent( - Long.toString(interval), - "data", - LogEvent.randomLogEvent(source).toCSV() - ); - return response.writeAndFlush(data); + ByteBuf eventId = response.getAllocator().buffer().writeLong(interval); + ByteBuf data = response.getAllocator().buffer().writeBytes(LogEvent.randomLogEvent( + source).toCSV().getBytes()); + return response.writeAndFlush(ServerSentEvent.withEventId(eventId, data)); } }); } diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogTailClient.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogTailClient.java index 096027e3..43fccf2d 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogTailClient.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/logtail/LogTailClient.java @@ -18,12 +18,13 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.logging.LogLevel; import io.reactivex.netty.RxNetty; import io.reactivex.netty.pipeline.PipelineConfigurators; import io.reactivex.netty.protocol.http.client.HttpClient; import io.reactivex.netty.protocol.http.client.HttpClientRequest; import io.reactivex.netty.protocol.http.client.HttpClientResponse; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import rx.Observable; import rx.functions.Func1; @@ -49,7 +50,9 @@ public LogTailClient(int port, int tailSize) { public List collectEventLogs() { HttpClient client = - RxNetty.createHttpClient("localhost", port, PipelineConfigurators.sseClientConfigurator()); + RxNetty.newHttpClientBuilder("localhost", port) + .enableWireLogging(LogLevel.DEBUG) + .pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()).build(); Iterable eventIterable = client.submit(HttpClientRequest.createGet("/logstream")) .flatMap(new Func1, Observable>() { @@ -63,7 +66,7 @@ public Observable call(HttpClientResponse resp }).map(new Func1() { @Override public LogEvent call(ServerSentEvent serverSentEvent) { - return LogEvent.fromCSV(serverSentEvent.getEventData()); + return LogEvent.fromCSV(serverSentEvent.contentAsString()); } } ).filter(new Func1() { diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseClient.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseClient.java index 5b91087b..fa99f18e 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseClient.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseClient.java @@ -22,7 +22,7 @@ import io.reactivex.netty.protocol.http.client.HttpClient; import io.reactivex.netty.protocol.http.client.HttpClientRequest; import io.reactivex.netty.protocol.http.client.HttpClientResponse; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import rx.Observable; import rx.functions.Func1; @@ -48,7 +48,7 @@ public HttpSseClient(int port, int noOfEvents) { public List readServerSideEvents() { HttpClient client = - RxNetty.createHttpClient("localhost", port, PipelineConfigurators.sseClientConfigurator()); + RxNetty.createHttpClient("localhost", port, PipelineConfigurators.clientSseConfigurator()); Iterable eventIterable = client.submit(HttpClientRequest.createGet("/hello")). flatMap(new Func1, Observable>() { diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseServer.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseServer.java index 85fca6d7..64715f09 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseServer.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/sse/HttpSseServer.java @@ -23,7 +23,7 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import rx.Notification; import rx.Observable; import rx.functions.Func1; @@ -54,7 +54,7 @@ public Observable handle(HttpServerRequest request, HttpServerResponse response) { return getIntervalObservable(response); } - }, PipelineConfigurators.sseServerConfigurator()); + }, PipelineConfigurators.serveSseConfigurator()); System.out.println("HTTP Server Sent Events server started..."); return server; } @@ -65,7 +65,9 @@ private Observable getIntervalObservable(final HttpServerResponse call(Long interval) { System.out.println("Writing SSE event for interval: " + interval); - return response.writeAndFlush(new ServerSentEvent(String.valueOf(interval), "notification", "hello " + interval)); + ByteBuf data = response.getAllocator().buffer().writeBytes(("hello " + interval).getBytes()); + ServerSentEvent event = new ServerSentEvent(data); + return response.writeAndFlush(event); } }).materialize() .takeWhile(new Func1, Boolean>() { diff --git a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/logtail/LogTailClientTest.java b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/logtail/LogTailClientTest.java index 2dc4bbe3..1c297728 100644 --- a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/logtail/LogTailClientTest.java +++ b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/logtail/LogTailClientTest.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import io.reactivex.netty.examples.ExamplesEnvironment; import io.reactivex.netty.protocol.http.server.HttpServer; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -28,8 +28,8 @@ import java.util.ArrayList; import java.util.List; -import static io.reactivex.netty.examples.http.logtail.LogTailClient.DEFAULT_TAIL_SIZE; import static io.reactivex.netty.examples.http.logtail.LogAggregator.DEFAULT_AG_PORT; +import static io.reactivex.netty.examples.http.logtail.LogTailClient.DEFAULT_TAIL_SIZE; /** * @author Tomasz Bak @@ -41,7 +41,7 @@ public class LogTailClientTest extends ExamplesEnvironment { private static final int PR_INTERVAL = 50; private HttpServer aggregationServer; - private List> producerServers = new ArrayList>(); + private final List> producerServers = new ArrayList>(); @Before public void setupServers() { diff --git a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/sse/HttpSseTest.java b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/sse/HttpSseTest.java index 3d53cbef..3b6f2bfd 100644 --- a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/sse/HttpSseTest.java +++ b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/sse/HttpSseTest.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import io.reactivex.netty.examples.ExamplesEnvironment; import io.reactivex.netty.protocol.http.server.HttpServer; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java b/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java index 7816d884..eddc2f1a 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java @@ -16,6 +16,7 @@ package io.reactivex.netty.channel; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.FileRegion; import rx.Observable; @@ -33,6 +34,8 @@ public interface ChannelWriter { void write(R msg, ContentTransformer transformer); + void writeBytes(ByteBuf msg); + void writeBytes(byte[] msg); void writeString(String msg); @@ -47,6 +50,8 @@ public interface ChannelWriter { Observable writeAndFlush(R msg, ContentTransformer transformer); + Observable writeBytesAndFlush(ByteBuf msg); + Observable writeBytesAndFlush(byte[] msg); Observable writeStringAndFlush(String msg); diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java b/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java index 620cd3ee..f6c8b96e 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java @@ -72,6 +72,12 @@ public Observable writeAndFlush(final R msg, final ContentTransformer< return flush(); } + @Override + public Observable writeBytesAndFlush(ByteBuf msg) { + writeBytes(msg); + return flush(); + } + @Override public void write(O msg) { writeOnChannel(msg); @@ -83,9 +89,14 @@ public void write(R msg, ContentTransformer transformer) { writeOnChannel(contentBytes); } + @Override + public void writeBytes(ByteBuf msg) { + write(msg, IdentityTransformer.DEFAULT_INSTANCE); + } + @Override public void writeBytes(byte[] msg) { - write(msg, new ByteTransformer()); + write(msg, ByteTransformer.DEFAULT_INSTANCE); } @Override @@ -95,7 +106,7 @@ public void writeString(String msg) { @Override public Observable writeBytesAndFlush(byte[] msg) { - write(msg, new ByteTransformer()); + write(msg, ByteTransformer.DEFAULT_INSTANCE); return flush(); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/IdentityTransformer.java b/rx-netty/src/main/java/io/reactivex/netty/channel/IdentityTransformer.java new file mode 100644 index 00000000..41c03914 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/IdentityTransformer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +/** + * An identity transformer that returns the passed {@link ByteBuf} + * + * @author Nitesh Kant + */ +public class IdentityTransformer implements ContentTransformer { + + public static final IdentityTransformer DEFAULT_INSTANCE = new IdentityTransformer(); + + @Override + public ByteBuf call(ByteBuf toTransform, ByteBufAllocator allocator) { + return toTransform; + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java b/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java index c9c56431..a6ef0fd8 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java @@ -110,7 +110,8 @@ private EpollEventLoopGroup getNativeParentEventLoop() { EpollEventLoopGroup eventLoopGroup = nativeParentEventLoop.get(); if (null == eventLoopGroup) { - EpollEventLoopGroup newEventLoopGroup = new EpollEventLoopGroup(parentEventLoopCount); + EpollEventLoopGroup newEventLoopGroup = new EpollEventLoopGroup(parentEventLoopCount, + new RxDefaultThreadFactory("rx-netty-epoll-eventloop")); if (!nativeParentEventLoop.compareAndSet(null, newEventLoopGroup)) { newEventLoopGroup.shutdownGracefully(); } @@ -121,7 +122,8 @@ private EpollEventLoopGroup getNativeParentEventLoop() { private EpollEventLoopGroup getNativeEventLoop() { EpollEventLoopGroup eventLoopGroup = nativeEventLoop.get(); if (null == eventLoopGroup) { - EpollEventLoopGroup newEventLoopGroup = new EpollEventLoopGroup(childEventLoopCount); + EpollEventLoopGroup newEventLoopGroup = new EpollEventLoopGroup(childEventLoopCount, + new RxDefaultThreadFactory("rx-netty-epoll-eventloop")); if (!nativeEventLoop.compareAndSet(null, newEventLoopGroup)) { newEventLoopGroup.shutdownGracefully(); } @@ -134,11 +136,11 @@ public static class SharedNioEventLoopGroup extends NioEventLoopGroup { private final AtomicInteger refCount = new AtomicInteger(); public SharedNioEventLoopGroup() { - super(0, new RxDefaultThreadFactory("rx-selector")); + super(0, new RxDefaultThreadFactory("rx-netty-nio-eventloop")); } public SharedNioEventLoopGroup(int threadCount) { - super(threadCount, new RxDefaultThreadFactory("rx-selector")); + super(threadCount, new RxDefaultThreadFactory("rx-netty-nio-eventloop")); } @Override diff --git a/rx-netty/src/main/java/io/reactivex/netty/pipeline/PipelineConfigurators.java b/rx-netty/src/main/java/io/reactivex/netty/pipeline/PipelineConfigurators.java index 89768f7e..bae3beb5 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/pipeline/PipelineConfigurators.java +++ b/rx-netty/src/main/java/io/reactivex/netty/pipeline/PipelineConfigurators.java @@ -32,10 +32,12 @@ import io.reactivex.netty.protocol.http.server.HttpServerPipelineConfigurator; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.SseClientPipelineConfigurator; import io.reactivex.netty.protocol.http.sse.SseOverHttpClientPipelineConfigurator; import io.reactivex.netty.protocol.http.sse.SseOverHttpServerPipelineConfigurator; +import io.reactivex.netty.protocol.http.sse.SseServerPipelineConfigurator; import io.reactivex.netty.protocol.text.SimpleTextProtocolConfigurator; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; @@ -83,11 +85,27 @@ public static PipelineConfigurator, HttpClientReque new HttpObjectAggregationConfigurator()); } - public static PipelineConfigurator, HttpClientRequest> sseClientConfigurator() { + public static PipelineConfigurator, HttpClientRequest> clientSseConfigurator() { + return new SseClientPipelineConfigurator(); + } + + public static PipelineConfigurator, HttpServerResponse> serveSseConfigurator() { + return new SseServerPipelineConfigurator(); + } + + /** + * @deprecated Use {@link #clientSseConfigurator()} instead. + */ + @Deprecated + public static PipelineConfigurator, HttpClientRequest> sseClientConfigurator() { return new SseOverHttpClientPipelineConfigurator(); } - public static PipelineConfigurator, HttpServerResponse> sseServerConfigurator() { + /** + * @deprecated Use {@link #serveSseConfigurator()} instead. + */ + @Deprecated + public static PipelineConfigurator, HttpServerResponse> sseServerConfigurator() { return new SseOverHttpServerPipelineConfigurator(); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SSEInboundHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SSEInboundHandler.java index 8653d0a9..b8326c3e 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SSEInboundHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SSEInboundHandler.java @@ -48,7 +48,9 @@ *

No HTTP protocol

* In this case, the handler does not do anything, assuming that there is no special handling required. * + * @deprecated Use {@link SseChannelHandler} instead. */ +@Deprecated @ChannelHandler.Sharable public class SSEInboundHandler extends SimpleChannelInboundHandler { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEvent.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEvent.java new file mode 100644 index 00000000..4b381786 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEvent.java @@ -0,0 +1,239 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +/** + * An object representing a server-sent-event following the SSE specifications + * + * A server sent event is composed of the following: + * + *
    +
  • Event id: This is the last event id seen on the stream this event was received. This can be null, if no id is received.
  • +
  • Event type: The last seen event type seen on the stream this event was received. This can be null, if no type is received.
  • +
  • Data: This is the actual event data.
  • +
+ * + *

Type

+ * + * A {@link ServerSentEvent} is of the type {@link Type#Data} unless it is explicitly passed on creation. + * + *

Memory management

+ * + * This is an implementation of {@link ByteBufHolder} so it is required to be explicitly released by calling + * {@link #release()} when this instance is no longer required. + * + * @author Nitesh Kant + */ +public class ServerSentEvent implements ByteBufHolder { + + private static final Logger logger = LoggerFactory.getLogger(ServerSentEvent.class); + + private static Charset sseEncodingCharset; + + static { + try { + sseEncodingCharset = Charset.forName("UTF-8"); + } catch (Exception e) { + logger.error("UTF-8 charset not available. Since SSE only contains UTF-8 data, we can not read SSE data."); + sseEncodingCharset = null; + } + } + + public enum Type { + Data, + Id, + EventType + } + + private final Type type; + private final ByteBuf data; + private final ByteBuf eventId; + private final ByteBuf eventType; + + public ServerSentEvent(Type type, ByteBuf data) { + this(type, null, null, data); + } + + public ServerSentEvent(ByteBuf data) { + this(Type.Data, data); + } + + public ServerSentEvent(ByteBuf eventId, ByteBuf eventType, ByteBuf data) { + this(Type.Data, eventId, eventType, data); + } + + protected ServerSentEvent(Type type, ByteBuf eventId, ByteBuf eventType, ByteBuf data) { + this.data = data; + this.type = type; + this.eventId = eventId; + this.eventType = eventType; + } + + /** + * The type of this event. For events which contain an event Id or event type along with data, the type is still + * {@link Type#Data}. The type will be {@link Type#Id} or {@link Type#EventType} only if the event just contains the + * event type or event id and no data. + * + * @return Type of this event. + */ + public Type getType() { + return type; + } + + public boolean hasEventId() { + return null != eventId; + } + + public boolean hasEventType() { + return null != eventType; + } + + public ByteBuf getEventId() { + return eventId; + } + + public String getEventIdAsString() { + return eventId.toString(getSseCharset()); + } + + public ByteBuf getEventType() { + return eventType; + } + + public String getEventTypeAsString() { + return eventType.toString(getSseCharset()); + } + + public String contentAsString() { + return data.toString(getSseCharset()); + } + + @Override + public ByteBuf content() { + return data; + } + + @Override + public ByteBufHolder copy() { + return new ServerSentEvent(type, null != eventId ? eventId.copy() : null, + null != eventType ? eventType.copy() : null, data.copy()); + } + + @Override + public ByteBufHolder duplicate() { + return new ServerSentEvent(type, null != eventId ? eventId.duplicate() : null, + null != eventType ? eventType.duplicate() : null, data.duplicate()); + } + + @Override + public int refCnt() { + return data.refCnt(); // Ref count is consistent across data, eventId and eventType + } + + @Override + public ByteBufHolder retain() { + if(hasEventId()) { + eventId.retain(); + } + if(hasEventType()) { + eventType.retain(); + } + data.retain(); + return this; + } + + @Override + public ByteBufHolder retain(int increment) { + if(hasEventId()) { + eventId.retain(increment); + } + if(hasEventType()) { + eventType.retain(increment); + } + data.retain(increment); + return this; + } + + @Override + public boolean release() { + if(hasEventId()) { + eventId.release(); + } + if(hasEventType()) { + eventType.release(); + } + return data.release(); + } + + @Override + public boolean release(int decrement) { + if(hasEventId()) { + eventId.release(decrement); + } + if(hasEventType()) { + eventType.release(decrement); + } + return data.release(decrement); + } + + /** + * Creates a {@link ServerSentEvent} instance with an event id. + * + * @param eventId Id for the event. + * @param data Data for the event. + * + * @return The {@link ServerSentEvent} instance. + */ + public static ServerSentEvent withEventId(ByteBuf eventId, ByteBuf data) { + return new ServerSentEvent(eventId, null, data); + } + + /** + * Creates a {@link ServerSentEvent} instance with an event type. + * + * @param eventType Type for the event. + * @param data Data for the event. + * + * @return The {@link ServerSentEvent} instance. + */ + public static ServerSentEvent withEventType(ByteBuf eventType, ByteBuf data) { + return new ServerSentEvent(null, eventType, data); + } + + /** + * Creates a {@link ServerSentEvent} instance with an event id and type. + * + * @param eventType Type for the event. + * @param eventId Id for the event. + * @param data Data for the event. + * + * @return The {@link ServerSentEvent} instance. + */ + public static ServerSentEvent withEventIdAndType(ByteBuf eventId, ByteBuf eventType, ByteBuf data) { + return new ServerSentEvent(eventId, eventType, data); + } + + protected Charset getSseCharset() { + return null == sseEncodingCharset ? Charset.forName("UTF-8") : sseEncodingCharset; + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java new file mode 100644 index 00000000..1318feb6 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoder.java @@ -0,0 +1,340 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * A decoder to decode Server sent events into {@link ServerSentEvent} + */ +public class ServerSentEventDecoder extends ByteToMessageDecoder { + + private static final Logger logger = LoggerFactory.getLogger(ServerSentEventDecoder.class); + + public static final int DEFAULT_MAX_FIELD_LENGTH = 100; + + private static final char[] EVENT_ID_FIELD_NAME = "event".toCharArray(); + private static final char[] DATA_FIELD_NAME = "data".toCharArray(); + private static final char[] ID_FIELD_NAME = "id".toCharArray(); + + protected static final ByteBufProcessor SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR = new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + return isLineDelimiter((char) value) || (char) value == ' '; + } + }; + + protected static final ByteBufProcessor SKIP_COLON_AND_WHITE_SPACE_PROCESSOR = new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + char valueChar = (char) value; + return valueChar == ':' || valueChar == ' '; + } + }; + + protected static final ByteBufProcessor SCAN_COLON_PROCESSOR = new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + return (char) value != ':'; + } + }; + + protected static final ByteBufProcessor SCAN_EOL_PROCESSOR = new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + return !isLineDelimiter((char) value); + } + }; + + private static Charset sseEncodingCharset; + + static { + try { + sseEncodingCharset = Charset.forName("UTF-8"); + } catch (Exception e) { + logger.error("UTF-8 charset not available. Since SSE only contains UTF-8 data, we can not read SSE data."); + sseEncodingCharset = null; + } + } + + private final int maxFieldNameLength; + + private enum State { + SkipColonAndWhiteSpaces,// Skip colon and all whitespaces after reading field name. + SkipLineDelimitersAndSpaces,// Skip all line delimiters after field value end. + ReadFieldName, // Read till a colon to get the name of the field. + ReadFieldValue // Read value till the line delimiter. + } + + /** + * Release of these buffers happens in the following ways: + * + * 1) If this was a data buffer, it is released when ServerSentEvent is released. + * 2) If this was an eventId buffer, it is released when next Id arrives or when the connection + * is closed. + * 3) If this was an eventType buffer, it is released when next type arrives or when the connection + * is closed. + */ + private ByteBuf lastEventId; + private ByteBuf lastEventType; + private ByteBuf incompleteData; // Can be field value of name, according to the current state. + + private ServerSentEvent.Type currentFieldType; + + private State state = State.ReadFieldName; + + public ServerSentEventDecoder() { + this(DEFAULT_MAX_FIELD_LENGTH); + } + + public ServerSentEventDecoder(int maxFieldNameLength) { + this.maxFieldNameLength = maxFieldNameLength; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + + if (null == sseEncodingCharset) { + throw new IllegalArgumentException("Can not read SSE data as UTF-8 charset is not available."); + } + + while (in.isReadable()) { + + final int readerIndexAtStart = in.readerIndex(); + + switch (state) { + case SkipColonAndWhiteSpaces: + skipColonAndWhiteSapaces(in); + state = State.ReadFieldValue; + break; + case SkipLineDelimitersAndSpaces: + skipLineDelimiters(in); + state = State.ReadFieldName; + break; + case ReadFieldName: + final int indexOfColon = scanAndFindColon(in); + + if (-1 == indexOfColon) { // No colon found + int bytesReceivedTillNow = null != incompleteData ? incompleteData.readableBytes() : + in.readableBytes() - readerIndexAtStart; + if (bytesReceivedTillNow > maxFieldNameLength) { // Reject as max field name length reached. + if (null != incompleteData) { + incompleteData.release(); + incompleteData = null; + } + throw new TooLongFieldNameException( + "Too long field name for a server sent event. Field name length received till now: " + + bytesReceivedTillNow); + } else { // Accumulate data into the field name buffer. + if (null == incompleteData) { + incompleteData = ctx.alloc().buffer(maxFieldNameLength, maxFieldNameLength); + } + // accumulate into incomplete data buffer to be used when the full data arrives. + incompleteData.writeBytes(in); + } + } else { + int fieldNameLengthInTheCurrentBuffer = indexOfColon - readerIndexAtStart; + + ByteBuf fieldNameBuffer; + if (null != incompleteData) { + // Read the remaining data into the temporary buffer + in.readBytes(incompleteData, fieldNameLengthInTheCurrentBuffer); + fieldNameBuffer = incompleteData; + incompleteData = null; + } else { + // Consume the data from the input buffer. + fieldNameBuffer = ctx.alloc().buffer(fieldNameLengthInTheCurrentBuffer, + fieldNameLengthInTheCurrentBuffer); + in.readBytes(fieldNameBuffer, fieldNameLengthInTheCurrentBuffer); + } + + state = State.SkipColonAndWhiteSpaces; // We have read the field name, next we should skip colon & WS. + currentFieldType = readCurrentFieldTypeFromBuffer(fieldNameBuffer); + } + break; + case ReadFieldValue: + + final int endOfLineStartIndex = scanAndFindEndOfLine(in); + + + if (-1 == endOfLineStartIndex) { // End of line not found, accumulate data into a temporary buffer. + if (null == incompleteData) { + incompleteData = ctx.alloc().buffer(in.readableBytes()); + } + // accumulate into incomplete data buffer to be used when the full data arrives. + incompleteData.writeBytes(in); + } else { // Read the data till end of line into the value buffer. + final int bytesAvailableInThisIteration = endOfLineStartIndex - readerIndexAtStart; + if (null == incompleteData) { + incompleteData = ctx.alloc().buffer(bytesAvailableInThisIteration, + bytesAvailableInThisIteration); + } + incompleteData.writeBytes(in, bytesAvailableInThisIteration); + + switch (currentFieldType) { + case Data: + if (incompleteData.isReadable()) { + out.add(ServerSentEvent.withEventIdAndType(lastEventId, lastEventType, + incompleteData)); + } else { + incompleteData.release(); + } + break; + case Id: + if (incompleteData.isReadable()) { + lastEventId = incompleteData; + } else { + incompleteData.release(); + lastEventId = null; + } + break; + case EventType: + if (incompleteData.isReadable()) { + lastEventType = incompleteData; + } else { + incompleteData.release(); + lastEventType = null; + } + break; + } + /** + * Since all data is read, reset the incomplete data to null. Release of this buffer happens in + * the following ways + * 1) If this was a data buffer, it is released when ServerSentEvent is released. + * 2) If this was an eventId buffer, it is released when next Id arrives or when the connection + * is closed. + * 3) If this was an eventType buffer, it is released when next type arrives or when the connection + * is closed. + */ + incompleteData = null; + state = State.SkipLineDelimitersAndSpaces; // Skip line delimiters after reading a field value completely. + } + break; + } + } + + } + + private static ServerSentEvent.Type readCurrentFieldTypeFromBuffer(final ByteBuf fieldNameBuffer) { + /** + * This code tries to eliminate the need of creating a string from the ByteBuf as the field names are very + * constrained. The algorithm is as follows: + * + * -- Scan the bytes in the buffer. + * -- If the first byte matches the expected field names then use the matching field name char array to verify + * the rest of the field name. + * -- If the first byte does not match, reject the field name. + * -- After the first byte, exact match the rest of the field name with the expected field name, byte by byte. + * -- If the name does not exactly match the expected value, then reject the field name. + */ + ServerSentEvent.Type toReturn = ServerSentEvent.Type.Data; + int readableBytes = fieldNameBuffer.readableBytes(); + final int readerIndexAtStart = fieldNameBuffer.readerIndex(); + char[] fieldNameToVerify = DATA_FIELD_NAME; + boolean verified = false; + int actualFieldNameIndexToCheck = 0; // Starts with 1 as the first char is validated by equality. + for (int i = readerIndexAtStart; i < readableBytes; i++) { + final char charAtI = (char) fieldNameBuffer.getByte(i); + + if (i == readerIndexAtStart) { + switch (charAtI) { // See which among the known field names this buffer belongs. + case 'e': + fieldNameToVerify = EVENT_ID_FIELD_NAME; + toReturn = ServerSentEvent.Type.EventType; + break; + case 'd': + fieldNameToVerify = DATA_FIELD_NAME; + toReturn = ServerSentEvent.Type.Data; + break; + case 'i': + fieldNameToVerify = ID_FIELD_NAME; + toReturn = ServerSentEvent.Type.Id; + break; + default: + throw new IllegalArgumentException("Illegal Server Sent event field name: " + + fieldNameBuffer.toString(sseEncodingCharset)); + } + } else { + if (++actualFieldNameIndexToCheck >= fieldNameToVerify.length || charAtI != fieldNameToVerify[actualFieldNameIndexToCheck]) { + // If the character does not match or the buffer is bigger than the expected name, then discard. + verified = false; + break; + } else { + // Verified till now. If all characters are matching then this stays as verified, else changed to false. + verified = true; + } + } + } + + if (verified) { + return toReturn; + } else { + throw new IllegalArgumentException("Illegal Server Sent event field name: " + + fieldNameBuffer.toString(sseEncodingCharset)); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + if (null != lastEventId) { + lastEventId.release(); + } + if (null != lastEventType) { + lastEventType.release(); + } + if (null != incompleteData) { + incompleteData.release(); + } + } + + protected static int scanAndFindColon(ByteBuf byteBuf) { + return byteBuf.forEachByte(SCAN_COLON_PROCESSOR); + } + + protected static int scanAndFindEndOfLine(ByteBuf byteBuf) { + return byteBuf.forEachByte(SCAN_EOL_PROCESSOR); + } + + protected static void skipLineDelimiters(ByteBuf byteBuf) { + skipTillMatching(byteBuf, SKIP_LINE_DELIMITERS_AND_SPACES_PROCESSOR); + } + + protected static void skipColonAndWhiteSapaces(ByteBuf byteBuf) { + skipTillMatching(byteBuf, SKIP_COLON_AND_WHITE_SPACE_PROCESSOR); + } + + protected static void skipTillMatching(ByteBuf byteBuf, ByteBufProcessor processor) { + int lastIndexProcessed = byteBuf.forEachByte(processor); + if (-1 == lastIndexProcessed) { + byteBuf.readerIndex(byteBuf.readerIndex() + byteBuf.readableBytes()); // If all the remaining bytes are to be ignored, discard the buffer. + } else { + byteBuf.readerIndex(lastIndexProcessed); + } + } + + protected static boolean isLineDelimiter(char c) { + return c == '\r' || c == '\n'; + } +} \ No newline at end of file diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEncoder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEncoder.java new file mode 100644 index 00000000..b8081010 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEncoder.java @@ -0,0 +1,93 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +/** + * An encoder to convert {@link ServerSentEvent} to a {@link io.netty.buffer.ByteBuf} + * + * @author Nitesh Kant + */ +@ChannelHandler.Sharable +public class ServerSentEventEncoder extends MessageToByteEncoder { + + private static final byte[] EVENT_PREFIX_BYTES = "event: ".getBytes(); + private static final byte[] NEW_LINE_AS_BYTES = "\n".getBytes(); + private static final byte[] ID_PREFIX_AS_BYTES = "id: ".getBytes(); + private static final byte[] DATA_PREFIX_AS_BYTES = "data: ".getBytes(); + private final boolean splitSseData; + + public ServerSentEventEncoder() { + this(false); + } + + /** + * Splits the SSE data on new line and create multiple "data" events if {@code splitSseData} is {@code true} + * + * @param splitSseData {@code true} if the SSE data is to be splitted on new line to create multiple "data" events. + */ + public ServerSentEventEncoder(boolean splitSseData) { + this.splitSseData = splitSseData; + } + + @Override + protected void encode(ChannelHandlerContext ctx, ServerSentEvent serverSentEvent, ByteBuf out) throws Exception { + if (serverSentEvent.hasEventType()) { // Write event type, if available + out.writeBytes(EVENT_PREFIX_BYTES); + out.writeBytes(serverSentEvent.getEventType()); + out.writeBytes(NEW_LINE_AS_BYTES); + } + + if (serverSentEvent.hasEventId()) { // Write event id, if available + out.writeBytes(ID_PREFIX_AS_BYTES); + out.writeBytes(serverSentEvent.getEventId()); + out.writeBytes(NEW_LINE_AS_BYTES); + } + + final ByteBuf content = serverSentEvent.content(); + + if (splitSseData) { + while (content.isReadable()) { // Scan the buffer and split on new line into multiple data lines. + final int readerIndexAtStart = content.readerIndex(); + int newLineIndex = content.forEachByte(new ByteBufProcessor() { + @Override + public boolean process(byte value) throws Exception { + return (char) value != '\n'; + } + }); + if (-1 == newLineIndex) { // No new line, write the buffer as is. + out.writeBytes(DATA_PREFIX_AS_BYTES); + out.writeBytes(content); + out.writeBytes(NEW_LINE_AS_BYTES); + } else { // Write the buffer till the new line and then iterate this loop + out.writeBytes(DATA_PREFIX_AS_BYTES); + out.writeBytes(content, newLineIndex - readerIndexAtStart); + content.readerIndex(content.readerIndex() + 1); + out.writeBytes(NEW_LINE_AS_BYTES); + } + } + } else { // write the buffer with data prefix and new line post fix. + out.writeBytes(DATA_PREFIX_AS_BYTES); + out.writeBytes(content); + out.writeBytes(NEW_LINE_AS_BYTES); + } + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseChannelHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseChannelHandler.java new file mode 100644 index 00000000..30e8058f --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseChannelHandler.java @@ -0,0 +1,114 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter; + +/** + * A handler to insert {@link ServerSentEventDecoder} at a proper position in the pipeline according to the protocol.
+ * There are the following cases, this handles: + * + *

Http response with chunked encoding

+ * In this case, the {@link ServerSentEventDecoder} is inserted after this {@link SseChannelHandler} + * + *

Http response with no chunking

+ * In this case, the {@link ServerSentEventDecoder} is inserted as the first handler in the pipeline. This makes the + * {@link io.netty.buffer.ByteBuf} at the origin to be converted to {@link ServerSentEvent} and hence any other handler + * will not look at this message unless it is really interested. + * + * + *

Caveat

+ * In some cases where any message is buffered before this pipeline change is made by this handler (i.e. adding + * {@link ServerSentEventDecoder} as the first handler), the {@link ServerSentEventDecoder} will not be applied to those + * messages. For this reason we also add {@link ServerSentEventDecoder} after this handler. In cases, where the first + * {@link ServerSentEventDecoder} is applied on the incoming data, the next instance of {@link ServerSentEventDecoder} + * will be redundant. + * + */ +@ChannelHandler.Sharable +public class SseChannelHandler extends SimpleChannelInboundHandler { + + public static final String NAME = "sse-inbound-handler"; + public static final String SSE_DECODER_HANDLER_NAME = "rx-sse-decoder"; + public static final String SSE_DECODER_POST_INBOUND_HANDLER = "rx-sse-decoder-post-inbound"; + + public SseChannelHandler() { + super(false); // Never auto-release, management of buffer is done via {@link ObservableAdapter} + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (msg instanceof HttpResponse) { + + /** + * Since SSE is an endless stream, we can never reuse a connection and hence as soon as SSE traffic is + * received, the connection is marked as discardable on close. + */ + ctx.channel().attr(ClientRequestResponseConverter.DISCARD_CONNECTION).set(true); // SSE traffic should always discard connection on close. + + ChannelPipeline pipeline = ctx.channel().pipeline(); + if (!HttpHeaders.isTransferEncodingChunked((HttpResponse) msg)) { + pipeline.addFirst(SSE_DECODER_HANDLER_NAME, new ServerSentEventDecoder()); + /* + * If there are buffered messages in the previous handler at the time this message is read, we would + * not be able to convert the content into an SseEvent. For this reason, we also add the decoder after + * this handler, so that we can handle the buffered messages. + * See the class level javadoc for more details. + */ + pipeline.addAfter(NAME, SSE_DECODER_POST_INBOUND_HANDLER, new ServerSentEventDecoder()); + } else { + pipeline.addAfter(NAME, SSE_DECODER_HANDLER_NAME, new ServerSentEventDecoder()); + } + ctx.fireChannelRead(msg); + } else if (msg instanceof LastHttpContent) { + LastHttpContent lastHttpContent = (LastHttpContent) msg; + + /** + * The entire pipeline is set based on the assumption that LastHttpContent signals the end of the stream. + * Since, here we are only passing the content to the rest of the pipeline, it becomes imperative to + * also pass LastHttpContent as such. + * For this reason, we send the LastHttpContent again in the pipeline. For this event sent, the content + * buffer will already be read and hence will not be read again. This message serves as only containing + * the trailing headers. + * However, we need to increment the ref count of the content so that the assumptions down the line of the + * ByteBuf always being released by the last pipeline handler will not break (as ServerSentEventDecoder releases + * the ByteBuf after read). + */ + lastHttpContent.content().retain(); // pseudo retain so that the last handler of the pipeline can release it. + + if (lastHttpContent.content().isReadable()) { + ctx.fireChannelRead(lastHttpContent.content()); + } + + ctx.fireChannelRead(msg); // Since the content is already consumed above (by the SSEDecoder), this is just + // as sending just trailing headers. This is critical to mark the end of stream. + + } else if (msg instanceof HttpContent) { + ctx.fireChannelRead(((HttpContent) msg).content()); + } else { + ctx.fireChannelRead(msg); + } + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseClientPipelineConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseClientPipelineConfigurator.java new file mode 100644 index 00000000..ee13fa57 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseClientPipelineConfigurator.java @@ -0,0 +1,56 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.channel.ChannelPipeline; +import io.reactivex.netty.pipeline.PipelineConfigurator; +import io.reactivex.netty.protocol.http.client.HttpClientPipelineConfigurator; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; + +/** + * {@link PipelineConfigurator} implementation for Server Sent Events to + * be used for SSE clients. + * + * @author Nitesh Kant + */ +public class SseClientPipelineConfigurator + implements PipelineConfigurator, HttpClientRequest> { + + public static final SseChannelHandler SSE_CHANNEL_HANDLER = new SseChannelHandler(); + + private final HttpClientPipelineConfigurator httpClientPipelineConfigurator; + + public SseClientPipelineConfigurator() { + this(new HttpClientPipelineConfigurator()); + } + + public SseClientPipelineConfigurator(HttpClientPipelineConfigurator httpClientPipelineConfigurator) { + this.httpClientPipelineConfigurator = httpClientPipelineConfigurator; + } + + @Override + public void configureNewPipeline(ChannelPipeline pipeline) { + httpClientPipelineConfigurator.configureNewPipeline(pipeline); + if (null != pipeline.get(HttpClientPipelineConfigurator.REQUEST_RESPONSE_CONVERTER_HANDLER_NAME)) { + pipeline.addBefore(HttpClientPipelineConfigurator.REQUEST_RESPONSE_CONVERTER_HANDLER_NAME, + SseChannelHandler.NAME, SSE_CHANNEL_HANDLER); + } else { + // Assuming that the underlying HTTP configurator knows what its doing. It will mostly fail though. + pipeline.addLast(SseChannelHandler.NAME, SSE_CHANNEL_HANDLER); + } + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpClientPipelineConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpClientPipelineConfigurator.java index c095e12a..4c40f52a 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpClientPipelineConfigurator.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpClientPipelineConfigurator.java @@ -31,7 +31,10 @@ * @see ServerSentEventDecoder * * @author Nitesh Kant + * + * @deprecated Use {@link io.reactivex.netty.protocol.http.sse.SseClientPipelineConfigurator} instead. */ +@Deprecated public class SseOverHttpClientPipelineConfigurator implements PipelineConfigurator, HttpClientRequest> { private final HttpClientPipelineConfigurator httpClientPipelineConfigurator; diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpServerPipelineConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpServerPipelineConfigurator.java index eda4986e..a8454a83 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpServerPipelineConfigurator.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseOverHttpServerPipelineConfigurator.java @@ -37,7 +37,10 @@ * @see ServerSentEventEncoder * * @author Nitesh Kant + * + * @deprecated Use {@link io.reactivex.netty.protocol.http.sse.SseServerPipelineConfigurator} instead. */ +@Deprecated public class SseOverHttpServerPipelineConfigurator implements PipelineConfigurator, HttpServerResponse> { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseServerPipelineConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseServerPipelineConfigurator.java new file mode 100644 index 00000000..e85977d6 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/SseServerPipelineConfigurator.java @@ -0,0 +1,71 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.reactivex.netty.pipeline.PipelineConfigurator; +import io.reactivex.netty.protocol.http.server.HttpServerPipelineConfigurator; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; + +/** + * {@link PipelineConfigurator} implementation for Server Sent Events to + * be used for SSE servers. + * + * @author Nitesh Kant + */ +public class SseServerPipelineConfigurator + implements PipelineConfigurator, HttpServerResponse> { + + public static final String SSE_ENCODER_HANDLER_NAME = "sse-encoder"; + public static final ServerSentEventEncoder SERVER_SENT_EVENT_ENCODER = new ServerSentEventEncoder(); // contains no state. + public static final String SSE_RESPONSE_HEADERS_COMPLETER = "sse-response-headers-completer"; + + private final HttpServerPipelineConfigurator serverPipelineConfigurator; + + public SseServerPipelineConfigurator() { + this(new HttpServerPipelineConfigurator()); + } + + public SseServerPipelineConfigurator(HttpServerPipelineConfigurator serverPipelineConfigurator) { + this.serverPipelineConfigurator = serverPipelineConfigurator; + } + + @Override + public void configureNewPipeline(ChannelPipeline pipeline) { + serverPipelineConfigurator.configureNewPipeline(pipeline); + pipeline.addLast(SSE_ENCODER_HANDLER_NAME, SERVER_SENT_EVENT_ENCODER); + pipeline.addLast(SSE_RESPONSE_HEADERS_COMPLETER, new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (HttpServerResponse.class.isAssignableFrom(msg.getClass())) { + @SuppressWarnings("rawtypes") + HttpServerResponse rxResponse = (HttpServerResponse) msg; + String contentTypeHeader = rxResponse.getHeaders().get(CONTENT_TYPE); + if (null == contentTypeHeader) { + rxResponse.getHeaders().set(CONTENT_TYPE, "text/event-stream"); + } + } + super.write(ctx, msg, promise); + } + }); + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/TooLongFieldNameException.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/TooLongFieldNameException.java new file mode 100644 index 00000000..4e19a018 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/sse/TooLongFieldNameException.java @@ -0,0 +1,43 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.handler.codec.DecoderException; + +/** + * Exception when the field name of a Server Sent Event is more than + * + * @author Nitesh Kant + */ +public class TooLongFieldNameException extends DecoderException { + + private static final long serialVersionUID = 5592673637644375829L; + + public TooLongFieldNameException() { + } + + public TooLongFieldNameException(String message, Throwable cause) { + super(message, cause); + } + + public TooLongFieldNameException(String message) { + super(message); + } + + public TooLongFieldNameException(Throwable cause) { + super(cause); + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEClientPipelineConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEClientPipelineConfigurator.java index 56853466..1444258a 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEClientPipelineConfigurator.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEClientPipelineConfigurator.java @@ -32,7 +32,10 @@ * @see ServerSentEventDecoder * * @author Nitesh Kant + * + * @deprecated Since SSE is always over HTTP, using the same protocol name isn't correct. */ +@Deprecated public class SSEClientPipelineConfigurator implements PipelineConfigurator { public static final SSEInboundHandler SSE_INBOUND_HANDLER = new SSEInboundHandler(); diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEServerPipelineConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEServerPipelineConfigurator.java index f2a55954..a888f1b8 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEServerPipelineConfigurator.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/SSEServerPipelineConfigurator.java @@ -29,7 +29,10 @@ * @see ServerSentEventEncoder * * @author Nitesh Kant + * + * @deprecated Since SSE is always over HTTP, using the same protocol name isn't correct. */ +@Deprecated public class SSEServerPipelineConfigurator implements PipelineConfigurator { public static final String SSE_ENCODER_HANDLER_NAME = "sse-encoder"; diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEvent.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEvent.java index 3632b161..7aef9749 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEvent.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEvent.java @@ -17,7 +17,10 @@ /** * This class represents a single server-sent event. + * + * @deprecated Use {@link io.reactivex.netty.protocol.http.sse.ServerSentEvent} instead. */ +@Deprecated public class ServerSentEvent { private final String eventId; diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoder.java index 4702825b..cc216512 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoder.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoder.java @@ -25,7 +25,10 @@ * An decoder for server-sent event. It does not record retry or last event ID. Otherwise, it * follows the same interpretation logic as documented here: Event Stream * Interpretation + * + * @deprecated Use {@link io.reactivex.netty.protocol.http.sse.ServerSentEventDecoder} instead. */ +@Deprecated public class ServerSentEventDecoder extends ReplayingDecoder { private final MessageBuffer eventBuffer; diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoder.java index 23edb114..b184d2df 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoder.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoder.java @@ -27,7 +27,10 @@ * An encoder to convert {@link ServerSentEvent} to a {@link ByteBuf} * * @author Nitesh Kant + * + * @deprecated Use {@link io.reactivex.netty.protocol.http.sse.ServerSentEventEncoder} instead. */ +@Deprecated @ChannelHandler.Sharable public class ServerSentEventEncoder extends MessageToMessageEncoder { diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientTest.java index 3ae8917c..9fc772c1 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientTest.java @@ -34,7 +34,7 @@ import io.reactivex.netty.pipeline.PipelineConfigurators; import io.reactivex.netty.protocol.http.server.HttpServer; import io.reactivex.netty.protocol.http.server.HttpServerBuilder; -import io.reactivex.netty.protocol.text.sse.ServerSentEvent; +import io.reactivex.netty.protocol.http.sse.ServerSentEvent; import io.reactivex.netty.server.RxServerThreadFactory; import org.junit.AfterClass; import org.junit.Assert; @@ -114,7 +114,7 @@ public void call() { @Test public void testChunkedStreaming() throws Exception { HttpClient client = RxNetty.createHttpClient("localhost", port, - PipelineConfigurators.sseClientConfigurator()); + PipelineConfigurators.clientSseConfigurator()); Observable> response = client.submit(HttpClientRequest.createGet("test/stream")); @@ -127,7 +127,7 @@ public void testChunkedStreaming() throws Exception { public void testMultipleChunks() throws Exception { HttpClient client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators - .sseClientConfigurator()); + .clientSseConfigurator()); Observable> response = client.submit(HttpClientRequest.createDelete("test/largeStream")); @@ -140,7 +140,7 @@ public void testMultipleChunks() throws Exception { public void testMultipleChunksWithTransformation() throws Exception { HttpClient client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators - .sseClientConfigurator()); + .clientSseConfigurator()); Observable> response = client.submit(HttpClientRequest.createGet("test/largeStream")); Observable transformed = response.flatMap(new Func1, Observable>() { @@ -150,7 +150,7 @@ public Observable call(HttpClientResponse httpResponse) return httpResponse.getContent().map(new Func1() { @Override public String call(ServerSentEvent sseEvent) { - return sseEvent.getEventData(); + return sseEvent.contentAsString(); } }); } @@ -274,7 +274,7 @@ public String call(ByteBuf byteBuf) { @Test public void testNonChunkingStream() throws Exception { HttpClient client = RxNetty.createHttpClient("localhost", port, - PipelineConfigurators.sseClientConfigurator()); + PipelineConfigurators.clientSseConfigurator()); Observable> response = client.submit(HttpClientRequest.createGet("test/nochunk_stream")); final List result = new ArrayList(); @@ -286,7 +286,7 @@ public Observable call(HttpClientResponse http }).toBlocking().forEach(new Action1() { @Override public void call(ServerSentEvent event) { - result.add(event.getEventData()); + result.add(event.contentAsString()); } }); assertEquals(RequestProcessor.smallStreamContent, result); @@ -429,7 +429,7 @@ public Observable call(HttpClientResponse sseE .toBlocking().forEach(new Action1() { @Override public void call(ServerSentEvent serverSentEvent) { - result.add(serverSentEvent.getEventData()); + result.add(serverSentEvent.contentAsString()); } }); } diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java new file mode 100644 index 00000000..b21e63ed --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventDecoderTest.java @@ -0,0 +1,258 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderException; +import io.reactivex.netty.NoOpChannelHandlerContext; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static io.reactivex.netty.protocol.http.sse.SseTestUtil.assertContentEquals; +import static io.reactivex.netty.protocol.http.sse.SseTestUtil.newServerSentEvent; +import static io.reactivex.netty.protocol.http.sse.SseTestUtil.newSseProtocolString; +import static io.reactivex.netty.protocol.http.sse.SseTestUtil.toByteBuf; +import static org.junit.Assert.assertEquals; + +/** + * @author Tomasz Bak + */ +public class ServerSentEventDecoderTest { + + private final TestableServerSentEventDecoder decoder = new TestableServerSentEventDecoder(); + + private final ChannelHandlerContext ch = new NoOpChannelHandlerContext(); + + static class TestableServerSentEventDecoder extends ServerSentEventDecoder { + + TestableServerSentEventDecoder() { + } + + TestableServerSentEventDecoder(int maxFieldNameLength) { + super(maxFieldNameLength); + } + + @Override + public void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { + super.callDecode(ctx, in, out); + } + } + + @Test + public void testOneDataLineDecode() throws Exception { + String eventType = "add"; + String eventId = "1"; + String data = "data line"; + + ServerSentEvent expected = newServerSentEvent(eventType, eventId, data); + + doTest(newSseProtocolString(eventType, eventId, data), expected); + } + + @Test + public void testMultipleDataLineDecode() throws Exception { + String eventType = "add"; + String eventId = "1"; + String data1 = "data line"; + String data2 = "data line"; + + ServerSentEvent expected1 = newServerSentEvent(eventType, eventId, data1); + ServerSentEvent expected2 = newServerSentEvent(eventType, eventId, data2); + + doTest(newSseProtocolString(eventType, eventId, data1, data2), expected1, expected2); + } + + @Test + public void testEventWithNoIdDecode() throws Exception { + String eventType = "add"; + String data = "data line"; + + ServerSentEvent expected = newServerSentEvent(eventType, null, data); + + doTest(newSseProtocolString(eventType, null, data), expected); + } + + @Test + public void testEventWithNoEventTypeDecode() throws Exception { + String eventId = "1"; + String data = "data line"; + + ServerSentEvent expected = newServerSentEvent(null, eventId, data); + + doTest(newSseProtocolString(null, eventId, data), expected); + } + + @Test + public void testEventWithDataOnlyDecode() throws Exception { + String data = "data line"; + + ServerSentEvent expected = newServerSentEvent(null, null, data); + + doTest(newSseProtocolString(null, null, data), expected); + } + + @Test + public void testResetEventType() throws Exception { + String eventType = "add"; + String eventId = "1"; + String data1 = "data line"; + String data2 = "data line"; + + ServerSentEvent expected1 = newServerSentEvent(eventType, eventId, data1); + ServerSentEvent expected2 = newServerSentEvent(null, eventId, data2); + + doTest(newSseProtocolString(eventType, eventId, data1) + newSseProtocolString("", null, data2), + expected1, expected2); + } + + @Test + public void testResetEventId() throws Exception { + String eventType = "add"; + String eventId = "1"; + String data1 = "data line"; + String data2 = "data line"; + + ServerSentEvent expected1 = newServerSentEvent(eventType, eventId, data1); + ServerSentEvent expected2 = newServerSentEvent(eventType, null, data2); + + doTest(newSseProtocolString(eventType, eventId, data1) + newSseProtocolString(null, "", data2), + expected1, expected2); + } + + @Test + public void testIncompleteEventId() throws Exception { + List out = new ArrayList(); + decoder.callDecode(ch, toByteBuf("id: 111"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + ServerSentEvent expected = newServerSentEvent(null, "1111", "data line"); + + doTest("1\ndata: data line\n", expected); + + } + + @Test + public void testIncompleteEventType() throws Exception { + List out = new ArrayList(); + decoder.callDecode(ch, toByteBuf("event: ad"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + ServerSentEvent expected = newServerSentEvent("add", null, "data line"); + + doTest("d\ndata: data line\n", expected); + + } + + @Test + public void testIncompleteEventData() throws Exception { + ServerSentEvent expected = newServerSentEvent("add", null, "data line"); + + List out = new ArrayList(); + + decoder.callDecode(ch, toByteBuf("event: add\n"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf("data: d"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + doTest("ata line\n", expected); + } + + @Test + public void testIncompleteFieldName() throws Exception { + ServerSentEvent expected = newServerSentEvent("add", null, "data line"); + + List out = new ArrayList(); + + decoder.callDecode(ch, toByteBuf("ev"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf("ent: add\n d"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + doTest("ata: data line\n", expected); + } + + @Test(expected = TooLongFieldNameException.class) + public void testMaxFieldLength() throws Exception { + TestableServerSentEventDecoder decoder = new TestableServerSentEventDecoder(2); + decoder.callDecode(ch, toByteBuf("eventt"), new ArrayList()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidFieldName() throws Throwable { + try { + decoder.callDecode(ch, toByteBuf("eventt: dumb \n"), new ArrayList()); + } catch (DecoderException e) { + throw e.getCause(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testFieldNameWithSpace() throws Throwable { + try { + decoder.callDecode(ch, toByteBuf("eve nt: dumb \n"), new ArrayList()); + } catch (DecoderException e) { + throw e.getCause(); + } + } + + @Test + public void testDataInMultipleChunks() throws Exception { + ServerSentEvent expected = newServerSentEvent(null, null, "data line"); + + List out = new ArrayList(); + + decoder.callDecode(ch, toByteBuf("da"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf("ta: d"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf("ata"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf(" "), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf("li"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + decoder.callDecode(ch, toByteBuf("ne"), out); + assertEquals("Unexpected number of decoded messages.", 0, out.size()); + + doTest("\n", expected); + } + + private void doTest(String eventText, ServerSentEvent... expected) { + List out = new ArrayList(); + decoder.callDecode(ch, toByteBuf(eventText), out); + + assertEquals(expected.length, out.size()); + + for (int i = 0; i < out.size(); i++) { + ServerSentEvent event = (ServerSentEvent) out.get(i); + assertContentEquals("Unexpected SSE data", expected[i].content(), event.content()); + assertContentEquals("Unexpected SSE event type", expected[i].getEventType(), + event.getEventType()); + assertContentEquals("Unexpected SSE event id", expected[i].getEventId(), event.getEventId()); + } + } +} \ No newline at end of file diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEncoderTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEncoderTest.java new file mode 100644 index 00000000..0381b900 --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEncoderTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.reactivex.netty.NoOpChannelHandlerContext; +import org.junit.Test; + +import java.nio.charset.Charset; + +import static io.reactivex.netty.protocol.http.sse.SseTestUtil.newServerSentEvent; +import static io.reactivex.netty.protocol.http.sse.SseTestUtil.newSseProtocolString; +import static org.junit.Assert.assertEquals; + +/** + * @author Tomasz Bak + */ +public class ServerSentEventEncoderTest { + + private final ServerSentEventEncoder encoder = new ServerSentEventEncoder(); + + private final ChannelHandlerContext ch = new NoOpChannelHandlerContext(); + + @Test + public void testOneDataLineEncode() throws Exception { + String eventType = "add"; + String eventId = "1"; + String data = "data line"; + ServerSentEvent event = newServerSentEvent(eventType, eventId, data); + String expectedOutput = newSseProtocolString(eventType, eventId, data); + doTest(expectedOutput, event); + } + + @Test + public void testMultipleDataLineEncode() throws Exception { + ServerSentEventEncoder splitEncoder = new ServerSentEventEncoder(true); + String eventType = "add"; + String eventId = "1"; + String data1 = "first line"; + String data2 = "second line"; + String data3 = "third line"; + String data = data1 + '\n' + data2 + '\n' + data3; + ServerSentEvent event = newServerSentEvent(eventType, eventId, data); + String expectedOutput = newSseProtocolString(eventType, eventId, data1, data2, data3); + doTest(splitEncoder, expectedOutput, event); + } + + @Test + public void testNoSplitMode() throws Exception { + String eventType = "add"; + String eventId = "1"; + String data = "first line\nsecond line\nthird line"; + ServerSentEvent event = newServerSentEvent(eventType, eventId, data); + String expectedOutput = newSseProtocolString(eventType, eventId, data); + doTest(expectedOutput, event); + } + + @Test + public void testEventWithNoIdEncode() throws Exception { + String eventType = "add"; + String data = "data line"; + ServerSentEvent event = newServerSentEvent(eventType, null, data); + String expectedOutput = newSseProtocolString(eventType, null, data); + doTest(expectedOutput, event); + } + + @Test + public void testEventWithNoEventTypeEncode() throws Exception { + String eventId = "1"; + String data = "data line"; + ServerSentEvent event = newServerSentEvent(null, eventId, data); + String expectedOutput = newSseProtocolString(null, eventId, data); + doTest(expectedOutput, event); + } + + @Test + public void testEventWithDataOnlyEncode() throws Exception { + String data = "data line"; + ServerSentEvent event = newServerSentEvent(null, null, data); + String expectedOutput = newSseProtocolString(null, null, data); + doTest(expectedOutput, event); + } + + private void doTest(String expectedOutput, ServerSentEvent... toEncode) throws Exception { + doTest(encoder, expectedOutput, toEncode); + } + + private void doTest(ServerSentEventEncoder encoder, String expectedOutput, + ServerSentEvent... toEncode) throws Exception { + ByteBuf out = Unpooled.buffer(); + + for (ServerSentEvent event: toEncode) { + encoder.encode(ch, event, out); + } + + assertEquals("Unexpected encoder output", expectedOutput, out.toString(Charset.defaultCharset())); + } +} \ No newline at end of file diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEndToEndTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEndToEndTest.java new file mode 100644 index 00000000..9e5739ae --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/ServerSentEventEndToEndTest.java @@ -0,0 +1,150 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.logging.LogLevel; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.pipeline.PipelineConfigurators; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; +import io.reactivex.netty.protocol.http.server.HttpServer; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import io.reactivex.netty.protocol.http.server.RequestHandler; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import rx.Observable; +import rx.functions.Func1; +import rx.functions.Func2; + +import java.util.concurrent.TimeUnit; + +/** + * @author Nitesh Kant + */ +public class ServerSentEventEndToEndTest { + + private HttpServer sseServer; + + @After + public void tearDown() throws Exception { + if (null != sseServer) { + sseServer.shutdown(); + sseServer.waitTillShutdown(); + } + } + + @Test + public void testWriteRawString() throws Exception { + startServer(new Func2, Long, Observable>() { + + @Override + public Observable call(HttpServerResponse response, + Long interval) { + return response.writeStringAndFlush("data: " + interval + '\n'); + } + }); + + receiveAndAssertSingleEvent(); + } + + @Test + public void testWriteRawBytes() throws Exception { + startServer(new Func2, Long, Observable>() { + + @Override + public Observable call(HttpServerResponse response, + Long interval) { + return response.writeBytesAndFlush(("data: " + interval + '\n').getBytes()); + } + }); + + receiveAndAssertSingleEvent(); + } + + @Test + public void testWriteRawByteBuf() throws Exception { + startServer(new Func2, Long, Observable>() { + + @Override + public Observable call(HttpServerResponse response, + Long interval) { + return response.writeBytesAndFlush(response.getAllocator().buffer().writeBytes( + ("data: " + interval + '\n').getBytes())); + } + }); + + receiveAndAssertSingleEvent(); + } + + protected void receiveAndAssertSingleEvent() { + ServerSentEvent result = receivesSingleEvent(); + Assert.assertNotNull("Unexpected server sent event received.", result); + Assert.assertEquals("Unexpected event data.", "0", result.contentAsString()); + Assert.assertNull("Unexpected event type.", result.getEventType()); + Assert.assertNull("Unexpected event id.", result.getEventId()); + } + + protected ServerSentEvent receivesSingleEvent() { + return receiveSse().take(1).toBlocking().singleOrDefault(null); + } + + private Observable receiveSse() { + return RxNetty.newHttpClientBuilder("localhost", + sseServer.getServerPort()) + .pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()) + .enableWireLogging(LogLevel.ERROR) + .build() + .submit(HttpClientRequest.createGet("/")) + .flatMap( + new Func1, Observable>() { + @Override + public Observable call( + HttpClientResponse response) { + if (response.getStatus().equals(HttpResponseStatus.OK)) { + return response.getContent(); + } else { + return Observable.error(new IllegalStateException( + "Server response status: " + response.getStatus() + .code())); + } + } + }); + } + + private void startServer(final Func2, Long, Observable> writeEventForInterval) { + sseServer = RxNetty.newHttpServerBuilder(0, new RequestHandler() { + + @Override + public Observable handle(HttpServerRequest request, + final HttpServerResponse response) { + return Observable.interval(1, TimeUnit.SECONDS) + .flatMap(new Func1>() { + @Override + public Observable call(Long interval) { + return writeEventForInterval.call(response, interval); + } + }); + } + }).enableWireLogging(LogLevel.ERROR) + .pipelineConfigurator(PipelineConfigurators.serveSseConfigurator()) + .build(); + sseServer.start(); + } +} diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/SseTestUtil.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/SseTestUtil.java new file mode 100644 index 00000000..a2205aaa --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/sse/SseTestUtil.java @@ -0,0 +1,74 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.sse; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.nio.charset.Charset; + +import static org.junit.Assert.assertEquals; + +/** + * @author Nitesh Kant + */ +public final class SseTestUtil { + + private SseTestUtil() { + } + + public static ServerSentEvent newServerSentEvent(String eventType, String eventId, String data) { + ByteBuf eventTypeBuffer = null != eventType + ? Unpooled.buffer().writeBytes(eventType.getBytes(Charset.forName("UTF-8"))) + : null; + ByteBuf eventIdBuffer = null != eventId + ? Unpooled.buffer().writeBytes(eventId.getBytes(Charset.forName("UTF-8"))) + : null; + + ByteBuf dataBuffer = Unpooled.buffer().writeBytes(data.getBytes(Charset.forName("UTF-8"))); + + return ServerSentEvent.withEventIdAndType(eventIdBuffer, eventTypeBuffer, dataBuffer); + } + + public static String newSseProtocolString(String eventType, String eventId, String... dataElements) { + StringBuilder eventStream = new StringBuilder(); + + if (null != eventType) { + eventStream.append("event: ").append(eventType).append('\n'); + } + + if (null != eventId) { + eventStream.append("id: ").append(eventId).append('\n'); + } + + for (String aData : dataElements) { + eventStream.append("data: ").append(aData).append('\n'); + } + return eventStream.toString(); + } + + public static void assertContentEquals(String message, ByteBuf expected, ByteBuf actual) { + assertEquals(message, + null == expected ? null : expected.toString(Charset.defaultCharset()), + null == actual ? null : actual.toString(Charset.defaultCharset())); + } + + public static ByteBuf toByteBuf(String event) { + ByteBuf in = Unpooled.buffer(1024); + in.writeBytes(event.getBytes(Charset.defaultCharset())); + return in; + } +} diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoderTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoderTest.java deleted file mode 100644 index 7341f38b..00000000 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/text/sse/ServerSentEventDecoderTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.reactivex.netty.protocol.text.sse; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.reactivex.netty.NoOpChannelHandlerContext; -import org.junit.Test; - -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * @author Tomasz Bak - */ -public class ServerSentEventDecoderTest { - - private final TestableServerSentEventDecoder decoder = new TestableServerSentEventDecoder(); - - private final ChannelHandlerContext ch = new NoOpChannelHandlerContext(); - - private final ByteBufAllocator alloc = new UnpooledByteBufAllocator(false); - - static class TestableServerSentEventDecoder extends ServerSentEventDecoder { - @Override - public void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { - super.callDecode(ctx, in, out); - } - } - - @Test - public void testOneDataLineDecode() throws Exception { - doTest( - "event: add\ndata: data line\nid: 1\n\n", - new ServerSentEvent("1", "add", "data line") - ); - } - - @Test - public void testMultipleDataLineDecode() throws Exception { - doTest( - "event: add\ndata: data line 1\ndata: data line 2\nid: 1\n\n", - new ServerSentEvent("1", "add", "data line 1\ndata line 2") - ); - } - - @Test - public void testEventWithNoIdDecode() throws Exception { - doTest( - "event: add\ndata: test data\n\n", - new ServerSentEvent(null, "add", "test data") - ); - } - - @Test - public void testEventWithNoEventTypeDencode() throws Exception { - doTest( - "data: test data\nid: 1\n\n", - new ServerSentEvent("1", null, "test data") - ); - } - - @Test - public void testEventWithDataOnlyDecode() throws Exception { - doTest( - "data: test data\n\n", - new ServerSentEvent(null, null, "test data") - ); - } - - - private void doTest(String eventText, ServerSentEvent expected) { - List out = new ArrayList(); - decoder.callDecode(ch, toByteBuf(eventText), out); - - assertEquals(1, out.size()); - ServerSentEvent event = (ServerSentEvent) out.get(0); - assertEquals(expected.getEventId(), event.getEventId()); - assertEquals(expected.getEventType(), event.getEventType()); - assertEquals(expected.getEventData(), event.getEventData()); - } - - private ByteBuf toByteBuf(String event) { - ByteBuf in = alloc.buffer(1024, 1024); - in.writeBytes(event.getBytes(Charset.defaultCharset())); - return in; - } -} \ No newline at end of file diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoderTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoderTest.java deleted file mode 100644 index 11806420..00000000 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/text/sse/ServerSentEventEncoderTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.reactivex.netty.protocol.text.sse; - -import io.netty.buffer.ByteBuf; -import io.reactivex.netty.NoOpChannelHandlerContext; -import org.junit.Test; - -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * @author Tomasz Bak - */ -public class ServerSentEventEncoderTest { - - private final ServerSentEventEncoder encoder = new ServerSentEventEncoder(); - - private final NoOpChannelHandlerContext ctx = new NoOpChannelHandlerContext(); - - @Test - public void testOneDataLineEncode() throws Exception { - doTest( - new ServerSentEvent("1", "add", "test data"), - "event: add\ndata: test data\nid: 1\n\n" - ); - } - - @Test - public void testMultipleDataLineEncode() throws Exception { - doTest( - new ServerSentEvent("1", "add", "first line\nsecond line\nthird line"), - "event: add\ndata: first line\ndata: second line\ndata: third line\nid: 1\n\n" - ); - } - - @Test - public void testNoSplitMode() throws Exception { - doTest( - new ServerSentEvent("1", "add", "first line\nsecond line\nthird line", false), - "event: add\ndata: first line\nsecond line\nthird line\nid: 1\n\n" - ); - } - - @Test - public void testEventWithNoIdEncode() throws Exception { - doTest( - new ServerSentEvent(null, "add", "test data"), - "event: add\ndata: test data\n\n" - ); - } - - @Test - public void testEventWithNoEventTypeEncode() throws Exception { - doTest( - new ServerSentEvent("1", null, "test data"), - "data: test data\nid: 1\n\n" - ); - } - - @Test - public void testEventWithDataOnlyEncode() throws Exception { - doTest( - new ServerSentEvent(null, null, "test data"), - "data: test data\n\n" - ); - } - - private void doTest(ServerSentEvent event, String expectedText) throws Exception { - List byteBufOut = new ArrayList(); - encoder.encode(ctx, event, byteBufOut); - - assertEquals(1, byteBufOut.size()); - - String eventText = ((ByteBuf) byteBufOut.get(0)).toString(Charset.defaultCharset()); - assertEquals(expectedText, eventText); - } -} \ No newline at end of file