Skip to content

Commit

Permalink
Fixes issue ReactiveX#164
Browse files Browse the repository at this point in the history
Removed the use of flatMap() with a custom operator.
  • Loading branch information
Nitesh Kant committed Jul 2, 2014
1 parent 4011533 commit 7400593
Showing 1 changed file with 60 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.protocol.http.server;

import io.netty.handler.codec.http.HttpHeaders;
Expand All @@ -23,8 +24,7 @@
import io.reactivex.netty.metrics.MetricEventsSubject;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.Subscriber;

/**
* @author Nitesh Kant
Expand Down Expand Up @@ -57,84 +57,97 @@ void useMetricEventsSubject(MetricEventsSubject<?> eventsSubject) {
@Override
public Observable<Void> handle(final ObservableConnection<HttpServerRequest<I>, HttpServerResponse<O>> newConnection) {

return newConnection.getInput().flatMap(new Func1<HttpServerRequest<I>, Observable<Void>>() {
return newConnection.getInput().lift(new Observable.Operator<Void, HttpServerRequest<I>>() {
@Override
@SuppressWarnings("unchecked")
public Observable<Void> call(HttpServerRequest<I> newRequest) {
final long startTimeMillis = Clock.newStartTimeMillis();
eventsSubject.onEvent(HttpServerMetricsEvent.NEW_REQUEST_RECEIVED);
newRequest.getContent().subscribe(new Observer<I>() {
// There is no guarantee that the RequestHandler will subscribe to the content, but we want this
// metric anyways, so we subscribe to the content here.
public Subscriber<? super HttpServerRequest<I>> call(final Subscriber<? super Void> child) {
return new Subscriber<HttpServerRequest<I>>() {
@Override
public void onCompleted() {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_RECEIVE_COMPLETE,
Clock.onEndMillis(startTimeMillis));
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@SuppressWarnings("unchecked")
@Override
public void onNext(I i) {
}
});
public void onNext(HttpServerRequest<I> newRequest) {
final long startTimeMillis = Clock.newStartTimeMillis();
eventsSubject.onEvent(HttpServerMetricsEvent.NEW_REQUEST_RECEIVED);
newRequest.getContent().subscribe(new Observer<I>() {
// There is no guarantee that the RequestHandler will subscribe to the content, but we want this
// metric anyways, so we subscribe to the content here.
@Override
public void onCompleted() {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_RECEIVE_COMPLETE,
Clock.onEndMillis(startTimeMillis));
}

final HttpServerResponse<O> response = new HttpServerResponse<O>(
newConnection.getChannelHandlerContext(),
@Override
public void onError(Throwable e) {
}

@Override
public void onNext(I i) {
}
});

final HttpServerResponse<O> response = new HttpServerResponse<O>(
newConnection.getChannelHandlerContext(),
/*
* Server should send the highest version it is compatible with.
* http://tools.ietf.org/html/rfc2145#section-2.3
*
* unless overriden explicitly.
*/
send10ResponseFor10Request ? newRequest.getHttpVersion() : HttpVersion.HTTP_1_1, eventsSubject);
if (newRequest.getHeaders().isKeepAlive()) {
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
} else {
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
}
Observable<Void> toReturn;

try {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_START,
Clock.onEndMillis(startTimeMillis));
toReturn = requestHandler.handle(newRequest, response);
if (null == toReturn) {
toReturn = Observable.empty();
}
} catch (Throwable throwable) {
toReturn = Observable.error(throwable);
}
if (newRequest.getHeaders().isKeepAlive()) {
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
} else {
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
}
Observable<Void> requestHandlingResult;

return toReturn
.doOnCompleted(new Action0() {
try {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_START,
Clock.onEndMillis(startTimeMillis));
requestHandlingResult = requestHandler.handle(newRequest, response);
if (null == requestHandlingResult) {
requestHandlingResult = Observable.empty();
}
} catch (Throwable throwable) {
requestHandlingResult = Observable.error(throwable);
}

requestHandlingResult.subscribe(new Observer<Void>() {
@Override
public void call() {
public void onCompleted() {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_SUCCESS,
Clock.onEndMillis(startTimeMillis));
response.close();
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {

@Override
public Observable<Void> call(Throwable throwable) {
public void onError(Throwable throwable) {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_FAILED,
Clock.onEndMillis(startTimeMillis), throwable);
if (!response.isHeaderWritten()) {
responseGenerator.updateResponse(response, throwable);
}
return Observable.empty();
response.close();
}
})
.finallyDo(new Action0() {

@Override
public void call() {
response.close();
public void onNext(Void aVoid) {
// Not significant.
}
});
}
};
}
});
}
Expand Down

0 comments on commit 7400593

Please sign in to comment.