Skip to content

Commit

Permalink
Merge pull request #140 from NiteshKant/master
Browse files Browse the repository at this point in the history
Fixes issue #139
  • Loading branch information
NiteshKant committed Jun 13, 2014
2 parents 56588db + 10a2aec commit 52f2378
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@
import rx.Observable;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author Nitesh Kant
*/
public class DefaultChannelWriter<O> implements ChannelWriter<O> {

protected static final Observable<Void> CONNECTION_ALREADY_CLOSED =
Observable.error(new IllegalStateException("Connection is already closed."));
protected final AtomicBoolean closeIssued = new AtomicBoolean();
private final ChannelHandlerContext ctx;
private final MultipleFutureListener unflushedWritesListener;

Expand Down Expand Up @@ -123,4 +128,20 @@ protected ChannelFuture writeOnChannel(Object msg) {
protected Channel getChannel() {
return ctx.channel();
}

public boolean isCloseIssued() {
return closeIssued.get();
}

public Observable<Void> close() {
if (closeIssued.compareAndSet(false, true)) {
return _close();
} else {
return CONNECTION_ALREADY_CLOSED;
}
}

protected Observable<Void> _close() {
return Observable.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import rx.Subscriber;
import rx.subjects.PublishSubject;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* An observable connection for connection oriented protocols.
*
Expand All @@ -33,10 +31,7 @@
*/
public class ObservableConnection<I, O> extends DefaultChannelWriter<O> {

protected static final Observable<Void> CONNECTION_ALREADY_CLOSED =
Observable.error(new IllegalStateException("Connection is already closed."));
private PublishSubject<I> inputSubject;
protected final AtomicBoolean closeIssued = new AtomicBoolean();

public ObservableConnection(final ChannelHandlerContext ctx) {
super(ctx);
Expand All @@ -48,29 +43,28 @@ public Observable<I> getInput() {
return inputSubject;
}

public boolean isCloseIssued() {
return closeIssued.get();
}

/**
* Closes this connection. This method is idempotent, so it can be called multiple times without any side-effect on
* the channel. <br/>
* This will also cancel any pending writes on the underlying channel. <br/>
*
* @return Observable signifying the close on the connection. Returns {@link Observable#error(Throwable)} if the
* @return Observable signifying the close on the connection. Returns {@link rx.Observable#error(Throwable)} if the
* close is already issued (may not be completed)
*/
@Override
public Observable<Void> close() {
if (closeIssued.compareAndSet(false, true)) {
PublishSubject<I> thisSubject = inputSubject;
cleanupConnection();
Observable<Void> toReturn = _closeChannel();
thisSubject.onCompleted(); // This is just to make sure we make the subject as completed after we finish
// closing the channel, results in more deterministic behavior for clients.
return toReturn;
} else {
return CONNECTION_ALREADY_CLOSED;
}
return super.close();
}

@Override
protected Observable<Void> _close() {
PublishSubject<I> thisSubject = inputSubject;
cleanupConnection();
Observable<Void> toReturn = _closeChannel();
thisSubject.onCompleted(); // This is just to make sure we make the subject as completed after we finish
// closing the channel, results in more deterministic behavior for clients.
return toReturn;
}

protected void cleanupConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public HttpResponseStatus getStatus() {
return nettyResponse.getStatus();
}

public Observable<Void> close() {
@Override
public Observable<Void> _close() {

writeHeadersIfNotWritten();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import rx.Observable;
import rx.functions.Func1;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -93,14 +92,14 @@ public void testConnectionHandlerReturnsError() throws Exception {
assertTrue("Error handler not invoked.", errorHandler.invoked);
}

private static void blockTillConnected(int serverPort) throws InterruptedException, ExecutionException {
private static void blockTillConnected(int serverPort) {
RxNetty.createTcpClient("localhost", serverPort).connect().flatMap(
new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<?>>() {
@Override
public Observable<Void> call(ObservableConnection<ByteBuf, ByteBuf> connection) {
return connection.close();
}
}).toBlocking().toFuture().get();
}).toBlocking().singleOrDefault(null);
}


Expand Down

0 comments on commit 52f2378

Please sign in to comment.