Skip to content

Commit

Permalink
Fixes #252 (Upgrade to RxJava 1.0.0-RC7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitesh Kant committed Oct 23, 2014
1 parent a0dea00 commit c8f26d3
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 32 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
version=0.3.15
netty_version=4.0.21.Final
slf4j_version=1.7.6
rxjava_version=0.19.1
rxjava_version=1.0.0-rc.7
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;

import java.util.ArrayList;
Expand Down Expand Up @@ -66,9 +65,9 @@ public Observable<Void> call(ServerSentEvent sse) {
ServerSentEvent data = new ServerSentEvent(sse.getEventId(), "data", sse.getEventData());
return response.writeAndFlush(data);
}
}).onErrorFlatMap(new Func1<OnErrorThrowable, Observable<Void>>() {
}).onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
@Override
public Observable<Void> call(OnErrorThrowable onErrorThrowable) {
public Observable<Void> call(Throwable throwable) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return response.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.ssl.DefaultFactories;
import io.reactivex.netty.protocol.http.client.FlatResponseOperator;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import rx.Observable;
import rx.functions.Action0;
import io.reactivex.netty.protocol.http.client.ResponseHolder;
import rx.functions.Func1;
import rx.functions.Func2;

import java.nio.charset.Charset;

import static io.reactivex.netty.examples.http.ssl.SslHelloWorldServer.*;
import static io.reactivex.netty.examples.http.ssl.SslHelloWorldServer.DEFAULT_PORT;

/**
* @author Tomasz Bak
Expand All @@ -48,25 +46,17 @@ public HttpResponseStatus sendHelloRequest() throws Exception {
.build();

HttpResponseStatus statusCode = rxClient.submit(HttpClientRequest.createGet("/hello"))
.mergeMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
@Override
public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
return response.getContent();
}
}, new Func2<HttpClientResponse<ByteBuf>, ByteBuf, HttpResponseStatus>() {
@Override
public HttpResponseStatus call(HttpClientResponse<ByteBuf> response, ByteBuf content) {
System.out.println(content.toString(Charset.defaultCharset()));
return response.getStatus();
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
System.out.println("=======================");
}
}).toBlocking().last();

.lift(FlatResponseOperator.<ByteBuf>flatResponse())
.map(new Func1<ResponseHolder<ByteBuf>, ResponseHolder<ByteBuf>>() {
@Override
public ResponseHolder<ByteBuf> call(ResponseHolder<ByteBuf> holder) {
System.out.println(holder.getContent().toString(
Charset.defaultCharset()));
System.out.println("=======================");
return holder;
}
})
.toBlocking().single().getResponse().getStatus();
return statusCode;
}

Expand Down
2 changes: 1 addition & 1 deletion rx-netty-remote-observable/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ targetCompatibility = JavaVersion.VERSION_1_6
dependencies {
compile project(':rx-netty')
compile 'com.netflix.numerus:numerus:1.1'
compile "com.netflix.rxjava:rxjava-math:${rxjava_version}"
compile "io.reactivex:rxjava-math:0.21.0"
compile "org.slf4j:slf4j-log4j12:${slf4j_version}"
testCompile 'junit:junit:4.10'
}
Expand Down
2 changes: 1 addition & 1 deletion rx-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tasks.withType(Javadoc).each {

dependencies {
compile "io.netty:netty-codec-http:${netty_version}"
compile "com.netflix.rxjava:rxjava-core:${rxjava_version}"
compile "io.reactivex:rxjava:${rxjava_version}"

testCompile 'junit:junit:4.10'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ public void testProcessingInADifferentThread() throws Exception {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(0, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> serverResponse) {
return Observable.just(1L, Schedulers.computation())
return Observable.just(1L).subscribeOn(Schedulers.computation())
.flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long aLong) {
serverResponse.setStatus(HttpResponseStatus.NOT_FOUND);
return serverResponse.close(true); // Processing in a separate thread needs a flush.
return serverResponse.close(
true); // Processing in a separate thread needs a flush.
}
});
}
Expand Down

0 comments on commit c8f26d3

Please sign in to comment.