Skip to content

Commit

Permalink
Merge pull request #136 from NiteshKant/master
Browse files Browse the repository at this point in the history
Fixes issue #134
  • Loading branch information
NiteshKant committed Jun 3, 2014
2 parents 2e097df + 74e8f0d commit 56588db
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,30 @@ public final class RxContexts {

public static final ThreadLocalRequestCorrelator DEFAULT_CORRELATOR = new ThreadLocalRequestCorrelator();

private static String defaultRequestIdContextKeyName = "X-RXNETTY-REQUEST-ID";

private RxContexts() {
}

public static <I, O> HttpServerBuilder<I, O> newHttpServerBuilder(int port, RequestHandler<I, O> requestHandler,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, correlator);
return newHttpServerBuilder(port, requestHandler, provider, correlator);
}

public static <I, O> HttpServerBuilder<I, O> newHttpServerBuilder(int port, RequestHandler<I, O> requestHandler,
String requestIdHeaderName,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(requestIdHeaderName, correlator);
return newHttpServerBuilder(port, requestHandler, provider, correlator);
}

public static <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, int port,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, correlator);
return newHttpClientBuilder(host, port, provider, correlator);
}

public static <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, int port, String requestIdHeaderName,
RequestCorrelator correlator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(requestIdHeaderName, correlator);
Expand All @@ -71,21 +85,40 @@ public static <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, i
RequestIdProvider provider,
RequestCorrelator correlator) {
HttpClientBuilder<I, O> builder = RxNetty.newHttpClientBuilder(host, port);
return builder.pipelineConfigurator(ContextPipelineConfigurators.<I, O>httpClientConfigurator(provider, correlator))
return builder.pipelineConfigurator(ContextPipelineConfigurators.<I, O>httpClientConfigurator(provider,
correlator))
.withChannelFactory(new HttpContextClientChannelFactory<I, O>(builder.getBootstrap(),
correlator));
}

public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, RequestHandler<ByteBuf, ByteBuf> requestHandler) {
return newHttpServerBuilder(port, requestHandler, defaultRequestIdContextKeyName,
DEFAULT_CORRELATOR).build();
}

public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, RequestHandler<ByteBuf, ByteBuf> requestHandler,
String requestIdHeaderName) {
return newHttpServerBuilder(port, requestHandler, requestIdHeaderName, DEFAULT_CORRELATOR).build();
}

public static HttpClient<ByteBuf, ByteBuf> createHttpClient(String host, int port) {
return RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port, defaultRequestIdContextKeyName,
DEFAULT_CORRELATOR).build();
}

public static HttpClient<ByteBuf, ByteBuf> createHttpClient(String host, int port, String requestIdHeaderName) {
return RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port, requestIdHeaderName,
DEFAULT_CORRELATOR).build();
}

public static <I, O> HttpServer<I, O> createHttpServer(int port, RequestHandler<I, O> requestHandler,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> configurator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, DEFAULT_CORRELATOR);
return newHttpServerBuilder(port, requestHandler, defaultRequestIdContextKeyName, DEFAULT_CORRELATOR)
.pipelineConfigurator(ContextPipelineConfigurators
.httpServerConfigurator(provider, DEFAULT_CORRELATOR, configurator)).build();
}

public static <I, O> HttpServer<I, O> createHttpServer(int port, RequestHandler<I, O> requestHandler,
String requestIdHeaderName,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> configurator) {
Expand All @@ -95,6 +128,17 @@ public static <I, O> HttpServer<I, O> createHttpServer(int port, RequestHandler<
.httpServerConfigurator(provider, DEFAULT_CORRELATOR, configurator)).build();
}

public static <I, O> HttpClient<I, O> createHttpClient(String host, int port,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(defaultRequestIdContextKeyName, DEFAULT_CORRELATOR);
return RxContexts.<I, O>newHttpClientBuilder(host, port, defaultRequestIdContextKeyName,
DEFAULT_CORRELATOR)
.pipelineConfigurator(ContextPipelineConfigurators.httpClientConfigurator(provider,
DEFAULT_CORRELATOR,
configurator))
.build();
}

public static <I, O> HttpClient<I, O> createHttpClient(String host, int port, String requestIdHeaderName,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator) {
HttpRequestIdProvider provider = new HttpRequestIdProvider(requestIdHeaderName, DEFAULT_CORRELATOR);
Expand All @@ -104,4 +148,15 @@ public static <I, O> HttpClient<I, O> createHttpClient(String host, int port, St
configurator))
.build();
}

/**
* Default Context key name used for extracting the request Id. This is the default and will be useful to set a
* system wide requestId key name so that it is consistent between all clients and the server created through
* this factory class.
*
* @param name The name of the context key to be used as default.
*/
public static void useRequestIdContextKey(String name) {
defaultRequestIdContextKeyName = name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void tearDown() throws Exception {
clientBootstrap.group().shutdownGracefully();
}
if (null != server) {
serverConnHandler.closeNewConnectionsOnReceive(false); // reset state after test. Close New should be explicit.
try {
serverConnHandler.closeAllClientConnections();
} catch (IllegalStateException e) {
Expand Down Expand Up @@ -342,15 +343,19 @@ public void testPoolExhaustion() throws Exception {

@Test
public void testIdleCleanupThread() throws Exception {
serverConnHandler.closeNewConnectionsOnReceive(false);
pool.shutdown();
pool = new ConnectionPoolImpl<String, String>(serverInfo, PoolConfig.DEFAULT_CONFIG, strategy, Executors.newScheduledThreadPool(1),
pool = new ConnectionPoolImpl<String, String>(serverInfo, PoolConfig.DEFAULT_CONFIG, strategy,
Executors.newScheduledThreadPool(1),
new PoolStatsImpl(), factory);

stats = pool.getStats();

ObservableConnection<String, String> connection = acquireAndTestStats();
connection.close();

serverConnHandler.closeAllClientConnections();

waitForClose();

assertAllConnectionsReturned();
Expand Down

0 comments on commit 56588db

Please sign in to comment.