Skip to content

Commit

Permalink
Fixes #230 (FlatResponseOperator does not emit any item if there is n…
Browse files Browse the repository at this point in the history
…o content.)

Also, fixed the test failure for previous PR
  • Loading branch information
Nitesh Kant committed Sep 7, 2014
1 parent 447f3f3 commit 24be8a7
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import io.reactivex.netty.contexts.RxContexts;
import io.reactivex.netty.contexts.TestContext;
import io.reactivex.netty.contexts.TestContextSerializer;
import io.reactivex.netty.protocol.http.client.FlatResponseOperator;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.ResponseHolder;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
Expand All @@ -41,18 +43,15 @@
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.reactivex.netty.contexts.ThreadLocalRequestCorrelator.getCurrentContextContainer;
import static io.reactivex.netty.contexts.ThreadLocalRequestCorrelator.getCurrentRequestId;
Expand Down Expand Up @@ -103,7 +102,7 @@ public String getContextValue(String key) {
return Observable.error(e);
}
}
}).enableWireLogging(LogLevel.DEBUG).build();
}).enableWireLogging(LogLevel.ERROR).build();
mockServer.start();
}

Expand All @@ -121,10 +120,10 @@ public void testEndToEnd() throws Exception {
public Observable<HttpClientResponse<ByteBuf>> call(HttpClient<ByteBuf, ByteBuf> client) {
return client.submit(HttpClientRequest.createGet("/"));
}
}).enableWireLogging(LogLevel.ERROR).build().start();
}).enableWireLogging(LogLevel.DEBUG).build().start();

HttpClient<ByteBuf, ByteBuf> testClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort())
.enableWireLogging(LogLevel.DEBUG).build();
.enableWireLogging(LogLevel.ERROR).build();

String reqId = "testE2E";
sendTestRequest(testClient, reqId);
Expand Down Expand Up @@ -184,7 +183,8 @@ public void testWithPooledConnections() throws Exception {
RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", mockServer.getServerPort(),
REQUEST_ID_HEADER_NAME,
RxContexts.DEFAULT_CORRELATOR)
.withMaxConnections(1).withIdleConnectionsTimeoutMillis(100000).build();
.withMaxConnections(1).enableWireLogging(LogLevel.ERROR)
.withIdleConnectionsTimeoutMillis(100000).build();
ContextsContainer container = new ContextsContainerImpl(new MapBackedKeySupplier());
container.addContext(CTX_1_NAME, CTX_1_VAL);
container.addContext(CTX_2_NAME, CTX_2_VAL, new TestContextSerializer());
Expand All @@ -203,7 +203,8 @@ public void testNoStateLeakOnThreadReuse() throws Exception {
RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", mockServer.getServerPort(),
REQUEST_ID_HEADER_NAME,
RxContexts.DEFAULT_CORRELATOR)
.withMaxConnections(1).withIdleConnectionsTimeoutMillis(100000).build();
.withMaxConnections(1).enableWireLogging(LogLevel.ERROR)
.withIdleConnectionsTimeoutMillis(100000).build();

ContextsContainer container = new ContextsContainerImpl(new MapBackedKeySupplier());
container.addContext(CTX_1_NAME, CTX_1_VAL);
Expand Down Expand Up @@ -259,7 +260,7 @@ public Observable<Void> call(HttpClientResponse<ByteBuf> response) {

private static void invokeMockServer(HttpClient<ByteBuf, ByteBuf> testClient, final String requestId,
boolean finishServerProcessing)
throws MockBackendRequestFailedException, InterruptedException {
throws MockBackendRequestFailedException, InterruptedException, TimeoutException, ExecutionException {
try {
sendTestRequest(testClient, requestId);
} finally {
Expand All @@ -277,40 +278,24 @@ private static void invokeMockServer(HttpClient<ByteBuf, ByteBuf> testClient, fi
}

private static void sendTestRequest(HttpClient<ByteBuf, ByteBuf> testClient, final String requestId)
throws MockBackendRequestFailedException, InterruptedException {
throws MockBackendRequestFailedException, InterruptedException, TimeoutException, ExecutionException {
System.err.println("Sending test request to mock server, with request id: " + requestId);
RxContexts.DEFAULT_CORRELATOR.dumpThreadState(System.err);
final CountDownLatch finishLatch = new CountDownLatch(1);
final List<HttpClientResponse<ByteBuf>> responseHolder = new ArrayList<HttpClientResponse<ByteBuf>>();
testClient.submit(HttpClientRequest.createGet("").withHeader(REQUEST_ID_HEADER_NAME, requestId))
.finallyDo(new Action0() {
@Override
public void call() {
finishLatch.countDown();
}
})
.subscribe(new Action1<HttpClientResponse<ByteBuf>>() {
@Override
public void call(HttpClientResponse<ByteBuf> response) {
responseHolder.add(response);
}
});

finishLatch.await(1, TimeUnit.MINUTES);
if (responseHolder.isEmpty()) {
throw new AssertionError("Response not received.");
}

System.err.println("Received response from mock server, with request id: " + requestId
+ ", status: " + responseHolder.get(0).getStatus());
ResponseHolder<ByteBuf> responseHolder =
testClient.submit(HttpClientRequest.createGet("").withHeader(REQUEST_ID_HEADER_NAME, requestId))
.lift(FlatResponseOperator.<ByteBuf>flatResponse())
.toBlocking().toFuture().get(1, TimeUnit.MINUTES);

HttpClientResponse<ByteBuf> response = responseHolder.get(0);
System.err.println("Received response from mock server, with request id: " + requestId
+ ", status: " + responseHolder.getResponse().getStatus());

if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
throw new MockBackendRequestFailedException("Test request failed. Status: " + response.getStatus().code());
if (responseHolder.getResponse().getStatus().code() != HttpResponseStatus.OK.code()) {
throw new MockBackendRequestFailedException("Test request failed. Status: "
+ responseHolder.getResponse().getStatus().code());
}

String requestIdGot = response.getHeaders().get(REQUEST_ID_HEADER_NAME);
String requestIdGot = responseHolder.getResponse().getHeaders().get(REQUEST_ID_HEADER_NAME);

if (!requestId.equals(requestId)) {
throw new MockBackendRequestFailedException("Request Id not sent from mock server. Expected: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

/**
* An operator to be used for a source of {@link HttpClientResponse} containing aggregated responses i.e. which does not
Expand Down Expand Up @@ -52,12 +51,35 @@ public void onError(Throwable e) {
public void onNext(final HttpClientResponse<T> response) {
response.getContent()
.take(1)
.map(new Func1<T, ResponseHolder<T>>() {
.lift(new Observable.Operator<ResponseHolder<T>, T>() {
@Override
public ResponseHolder<T> call(T t) {
return new ResponseHolder<T>(response, t);
public Subscriber<? super T> call(final Subscriber<? super ResponseHolder<T>> child) {
return new Subscriber<T>() {

private boolean hasContent;

@Override
public void onCompleted() {
if (!hasContent) {
child.onNext(new ResponseHolder<T>(response));
}
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T next) {
hasContent = true;
child.onNext(new ResponseHolder<T>(response, next));
}
};
}
}).subscribe(child);
})
.subscribe(child);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,26 @@ public ResponseHolder(HttpClientResponse<T> response, T content) {
this.content = content;
}

public ResponseHolder(HttpClientResponse<T> response) {
this.response = response;
content = null;
}

public HttpClientResponse<T> getResponse() {
return response;
}

/**
* Returns the content, if any. Use {@link #hasContent()} to check if there is a content in this holder.
*
* @return The content, if any, {@code null} otherwise.
* Use {@link #hasContent()} to check if there is a content in this holder.
*/
public T getContent() {
return content;
}

public boolean hasContent() {
return null != content;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.protocol.http.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.protocol.http.UnicastContentSubject;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;

import java.util.concurrent.TimeUnit;

/**
* @author Nitesh Kant
*/
public class FlatResponseOperatorTest {

@Test
public void testContent() throws Exception {
DefaultHttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.NO_CONTENT);
UnicastContentSubject<ByteBuf> contentSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();
contentSubject.onNext(Unpooled.buffer());
contentSubject.onCompleted();

ResponseHolder<ByteBuf> holder = Observable.just(new HttpClientResponse<ByteBuf>(nettyResponse, contentSubject))
.lift(FlatResponseOperator.<ByteBuf>flatResponse())
.toBlocking().toFuture().get(1, TimeUnit.MINUTES);

Assert.assertEquals("Unexpected http response status", HttpResponseStatus.NO_CONTENT,
holder.getResponse().getStatus());
Assert.assertTrue("Response holder does not have content.", holder.hasContent());

}

@Test
public void testNoContent() throws Exception {
DefaultHttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.NO_CONTENT);
UnicastContentSubject<ByteBuf> contentSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();
contentSubject.onCompleted();

ResponseHolder<ByteBuf> holder = Observable.just(new HttpClientResponse<ByteBuf>(nettyResponse, contentSubject))
.lift(FlatResponseOperator.<ByteBuf>flatResponse())
.toBlocking().toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected http response status", HttpResponseStatus.NO_CONTENT,
holder.getResponse().getStatus());
Assert.assertFalse("Response holder has content.", holder.hasContent());

}
}

0 comments on commit 24be8a7

Please sign in to comment.