Skip to content

Commit

Permalink
Merge pull request #188 from NiteshKant/master
Browse files Browse the repository at this point in the history
Adding an option to use a threadpool for Request/Connection processing.
  • Loading branch information
NiteshKant committed Jul 22, 2014
2 parents e63ec91 + 9047b36 commit 1f012bc
Show file tree
Hide file tree
Showing 22 changed files with 569 additions and 44 deletions.
2 changes: 2 additions & 0 deletions rx-netty-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Protocol | Example / Test | Description
HTTP | [Plain text](src/main/java/io/reactivex/netty/examples/http/plaintext) | A performance optimized helloworld. Use this as a template for any simple perf tests.
HTTP | [Hello World](src/main/java/io/reactivex/netty/examples/http/helloworld) | Simple HTTP GET client/server implementation.
HTTP | [SSL Hello World](src/main/java/io/reactivex/netty/examples/http/ssl) | Hello World version with SSL connection.
HTTP | [CPU intensive work](src/main/java/io/reactivex/netty/examples/http/cpuintensive) | Hello World for CPU intensive request processing.
HTTP | [Simple POST](src/main/java/io/reactivex/netty/examples/http/post) | Simple HTTP POST client/server implementation.
HTTP | [Chunked GET](src/main/java/io/reactivex/netty/examples/http/chunk) | An example of how to handle large, chunked reply that is not pre-aggregated by the default pipline configurator.
HTTP | [Server Side Events](src/main/java/io/reactivex/netty/examples/http/sse) | This examples demonstrates how to implement server side event stream, and how to handle it on the client side.
Expand All @@ -21,6 +22,7 @@ TCP | [Echo Server](src/main/java/io/reactivex/netty/examples/tcp/echo)
TCP | [SSL Echo Server](src/main/java/io/reactivex/netty/examples/tcp/ssl) | A simple echo client with SSL connection.
TCP | [TCP Server Side Event Stream](src/main/java/io/reactivex/netty/examples/tcp/event) | TCP server side event stream example, with configurable client side processing delay.
TCP | [Interval](src/main/java/io/reactivex/netty/examples/tcp/interval) | A bit more sophisticated event stream example, with explicit subscribe/unsubscribe control mechanism.
TCP | [CPU intensive work](src/main/java/io/reactivex/netty/examples/tcp/cpuintensive) | A simple example for cpu intensive connection handling.
UDP | [Hello World](src/main/java/io/reactivex/netty/examples/udp) | UDP version of a simple request - reply client/server implementation.

Build
Expand Down
36 changes: 36 additions & 0 deletions rx-netty-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@





sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

Expand Down Expand Up @@ -64,6 +66,40 @@ task runPlainTextClient (dependsOn: [classes], type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
}

task runCpuIntensiveHttpServer (dependsOn: [classes], type: JavaExec) {
group = "Examples"
description = "Run an HTTP server which does CPU intensive work."

main = "io.reactivex.netty.examples.http.plaintext.PlainTextServer"
classpath = sourceSets.main.runtimeClasspath
}

task runCpuIntensiveHttpClient (dependsOn: [classes], type: JavaExec) {
group = "Examples"
description = "Run an HTTP client for the CPU intensive server."

args "8790"
main = "io.reactivex.netty.examples.http.helloworld.HelloWorldClient"
classpath = sourceSets.main.runtimeClasspath
}

task runCpuIntensiveTcpServer (dependsOn: [classes], type: JavaExec) {
group = "Examples"
description = "Run a TCP server which does CPU intensive work."

main = "io.reactivex.netty.examples.http.plaintext.PlainTextServer"
classpath = sourceSets.main.runtimeClasspath
}

task runCpuIntensiveTcpClient (dependsOn: [classes], type: JavaExec) {
group = "Examples"
description = "Run a TCP client for the CPU intensive server."

args "8790"
main = "io.reactivex.netty.examples.http.helloworld.HelloWorldClient"
classpath = sourceSets.main.runtimeClasspath
}

task runSslHelloWorldClient (dependsOn: [classes], type: JavaExec) {
group = "Examples"
description = "Run SSL Hello World client"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.examples.http.cpuintensive;

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;

import java.util.Map;

/**
* @author Nitesh Kant
*/
public final class CPUIntensiveServer {

static final int DEFAULT_PORT = 8790;
public static final String IN_EVENT_LOOP_HEADER_NAME = "in_event_loop";

private final int port;

public CPUIntensiveServer(int port) {
this.port = port;
}

public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server =
RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> response) {
printRequestHeader(request);
response.getHeaders().set(IN_EVENT_LOOP_HEADER_NAME,
response.getChannelHandlerContext().channel().eventLoop()
.inEventLoop());
response.writeString("Welcome!!");
return response.close(false);
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator())
.withRequestProcessingThreads(50) /*Uses a thread pool of 50 threads to process the requests.*/
.build();
return server;
}

public void printRequestHeader(HttpServerRequest<ByteBuf> request) {
System.out.println("New request received");
System.out.println(request.getHttpMethod() + " " + request.getUri() + ' ' + request.getHttpVersion());
for (Map.Entry<String, String> header : request.getHeaders().entries()) {
System.out.println(header.getKey() + ": " + header.getValue());
}
}

public static void main(final String[] args) {
new CPUIntensiveServer(DEFAULT_PORT).createServer().startAndWait();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Overview
========

An example of how to write a server which does some CPU intensive or Blocking work and hence is not suitable for running
the request processing in the channel's event loop.
This is achieved by using netty's [`EventExecutorGroup`](https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java)
as a threadpool.
`RxNetty` makes sure that the [`RequestHandler`](https://github.com/Netflix/RxNetty/blob/master/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/RequestHandler.java)
as well as the subscribers of `HttpServerRequest`'s content happens on this executor.

Running
=======

To run the example execute:

```
$ cd RxNetty/rx-netty-examples
$ ../gradlew runCpuIntensiveHttpServer
```

and in another console:

```
$ cd RxNetty/rx-netty-examples
$ ../gradlew runCpuIntensiveHttpClient
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.examples.tcp.cpuintensive;

import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.channel.RxDefaultThreadFactory;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.RxServer;
import rx.Observable;
import rx.functions.Func1;

/**
* @author Nitesh Kant
*/
public final class CPUIntensiveServer {

static final int DEFAULT_PORT = 8791;

private final int port;

public CPUIntensiveServer(int port) {
this.port = port;
}

public RxServer<String, String> createServer() {
RxServer<String, String> server =
RxNetty.newTcpServerBuilder(port, new ConnectionHandler<String, String>() {
@Override
public Observable<Void> handle(
final ObservableConnection<String, String> connection) {
System.out.println("New client connection established.");
connection.writeAndFlush("Welcome! \n\n");
return connection.getInput()
.flatMap(new Func1<String, Observable<Void>>() {
@Override
public Observable<Void> call(String msg) {
System.out.println("onNext: " + msg);
msg = msg.trim();
if (!msg.isEmpty()) {
return connection.writeAndFlush(
"echo => " + msg + '\n');
} else {
return Observable.empty();
}
}
});
}
})
.pipelineConfigurator(PipelineConfigurators.textOnlyConfigurator())
.withEventExecutorGroup(new DefaultEventExecutorGroup(50, new RxDefaultThreadFactory(
"rx-connection-processor"))) /*Uses 50 threads to process connections.*/
.build();
return server;
}

public static void main(final String[] args) {
new CPUIntensiveServer(DEFAULT_PORT).createServer().startAndWait();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Overview
========

An example of how to write a server which does some CPU intensive or Blocking work and hence is not suitable for running
the connection processing in the channel's event loop.
This is achieved by using netty's [`EventExecutorGroup`](https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java)
as a threadpool.
`RxNetty` makes sure that the [`ConnectionHandler`](https://github.com/Netflix/RxNetty/blob/master/rx-netty/src/main/java/io/reactivex/netty/channel/ConnectionHandler.java)
as well as the subscribers of `ObservableConnection`'s content happens on this executor.

Running
=======

To run the example execute:

```
$ cd RxNetty/rx-netty-examples
$ ../gradlew runCpuIntensiveTcpServer
```

and in another console:

```
$ cd RxNetty/rx-netty-examples
$ ../gradlew runCpuIntensiveTcpClient
```
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void call() {
}

public static void main(String[] args) {
new TcpEchoClient(DEFAULT_PORT).sendEchos();
int port = DEFAULT_PORT;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TcpEchoClient(port).sendEchos();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.examples.http.cpuintensive;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.examples.ExamplesEnvironment;
import io.reactivex.netty.examples.http.helloworld.HelloWorldClient;
import io.reactivex.netty.protocol.http.server.HttpServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static io.reactivex.netty.examples.http.cpuintensive.CPUIntensiveServer.DEFAULT_PORT;

/**
* @author Tomasz Bak
*/
public class CpuIntensiveServerTest extends ExamplesEnvironment {

private HttpServer<ByteBuf, ByteBuf> server;

@Before
public void setupHttpHelloServer() {
server = new CPUIntensiveServer(DEFAULT_PORT).createServer();
server.start();
}

@After
public void stopServer() throws InterruptedException {
server.shutdown();
}

@Test
public void testRequestReplySequence() {
HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT); // The client is no different than hello world.
HttpResponseStatus statusCode = client.sendHelloRequest();
Assert.assertEquals(HttpResponseStatus.OK, statusCode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.examples.tcp.cpuintensive;

import io.reactivex.netty.examples.ExamplesEnvironment;
import io.reactivex.netty.examples.tcp.echo.TcpEchoClient;
import io.reactivex.netty.server.RxServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

import static io.reactivex.netty.examples.tcp.cpuintensive.CPUIntensiveServer.DEFAULT_PORT;

/**
* @author Tomasz Bak
*/
public class CPUIntensiveServerTest extends ExamplesEnvironment {

private RxServer<String, String> server;

@Before
public void setupServer() {
server = new CPUIntensiveServer(DEFAULT_PORT).createServer();
server.start();
}

@After
public void stopServer() throws Exception {
server.shutdown();
}

@Test
public void testRequestReplySequence() {
TcpEchoClient client = new TcpEchoClient(DEFAULT_PORT);
List<String> reply = client.sendEchos();
Assert.assertEquals(10, reply.size());
}
}
Loading

0 comments on commit 1f012bc

Please sign in to comment.