Skip to content

Commit

Permalink
Add internal buffering to any unconsumed data by the decoder so that …
Browse files Browse the repository at this point in the history
…decoder does not need to repeat the boilerplate code to buffer unconsumed data. Changing the JUnit tests and examples correspondingly.

Add more locks to AsyncBackupRequestExecutor to make it run consistently across implementations.
  • Loading branch information
Allen Wang committed Oct 25, 2013
1 parent a061880 commit 9bab116
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static interface ExecutionResult<T extends IResponse> extends Future<T> {
final AtomicBoolean responseRecevied = new AtomicBoolean();
final AtomicBoolean completedCalled = new AtomicBoolean();
final AtomicBoolean failedCalled = new AtomicBoolean();
final AtomicBoolean cancelledCalled = new AtomicBoolean();
// final AtomicBoolean cancelledCalled = new AtomicBoolean();
final Lock lock = new ReentrantLock();
final Condition responseChosen = lock.newCondition();
final Multimap<URI, Future<S>> map = ArrayListMultimap.create();
Expand All @@ -143,43 +143,65 @@ public static interface ExecutionResult<T extends IResponse> extends Future<T> {
final int sequenceNumber = i;
Future<S> future = asyncClient.execute(requests.get(i), decoder, new ResponseCallback<S, E>() {
private volatile boolean chosen = false;
private AtomicBoolean cancelledInvokedOnSameRequest = new AtomicBoolean();
@Override
public void completed(S response) {
if (completedCalled.compareAndSet(false, true)
&& callback != null && chosen) {
// System.err.println("completed called");
// Thread.dumpStack();
lock.lock();
boolean shouldInvokeCallback = false;
try {
if (chosen) {
shouldInvokeCallback = true;
completedCalled.set(true);
}
} finally {
lock.unlock();
}
if (callback != null && shouldInvokeCallback) {
callback.completed(response);
}
}

@Override
public void failed(Throwable e) {
int count = failedCount.incrementAndGet();
if ((count == numServers || chosen) && failedCalled.compareAndSet(false, true)) {
lock.lock();
try {
lock.lock();
boolean shouldInvokeCallback = false;
try {
int count = failedCount.incrementAndGet();
if (count == numServers || chosen) {
finalSequenceNumber.set(sequenceNumber);
responseChosen.signalAll();
} finally {
lock.unlock();
}
if (callback != null) {
callback.failed(e);
shouldInvokeCallback = true;
failedCalled.set(true);
}
} finally {
lock.unlock();
}
if (shouldInvokeCallback && callback != null) {
callback.failed(e);
}
}

@Override
public void cancelled() {
int count = failedCount.incrementAndGet();
if ((count == numServers || chosen) && cancelledCalled.compareAndSet(false, true)) {
// avoid getting cancelled multiple times
if (cancelledInvokedOnSameRequest.compareAndSet(false, true)) {
lock.lock();
int count = failedCount.incrementAndGet();
boolean shouldInvokeCallback = false;
try {
finalSequenceNumber.set(sequenceNumber);
responseChosen.signalAll();
if (count == numServers || chosen) {
// System.err.println("chosen:" + chosen);
// System.err.println("failed count: " + failedCount.get());
finalSequenceNumber.set(sequenceNumber);
responseChosen.signalAll();
shouldInvokeCallback = true;
}
} finally {
lock.unlock();
}
if (callback != null) {
if (shouldInvokeCallback && callback != null) {
callback.cancelled();
}
}
Expand All @@ -188,9 +210,10 @@ public void cancelled() {
@Override
public void responseReceived(S response) {
if (responseRecevied.compareAndSet(false, true)) {
chosen = true;
// System.err.println("chosen=true");
lock.lock();
try {
chosen = true;
finalSequenceNumber.set(sequenceNumber);
responseChosen.signalAll();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.netflix.client;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down
11 changes: 11 additions & 0 deletions ribbon-core/src/main/java/com/netflix/client/StreamDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,16 @@
* @param <S> Type of storage used for partial content. For example, {@link ByteBuffer}.
*/
public interface StreamDecoder<T, S> {
/**
* Decode from the input and create an entity. The client implementation should call this method in
* a loop until it returns null, which means no more stream entity can be created from the unconsumed input.
* If there is any unconsumed input, client implementation should buffer and use it in conjunction with
* the next available input, for example, an HTTP chunk. In other words, the decoder should not
* have to buffer unconsumed input.
*
* @param input input to read and create entity from
* @return Entity created, or null if nothing can be created from the unconsumed input
* @throws IOException
*/
T decode(S input) throws IOException;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.netflix.ribbon.examples;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Future;

import com.netflix.client.AsyncClient;
Expand All @@ -42,7 +41,7 @@ public void run() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream").build();
AsyncHttpClient<ByteBuffer> client = AsyncHttpClientBuilder.withApacheAsyncClient().buildClient();
try {
Future<HttpResponse> response = client.execute(request, new SSEDecoder(), new ResponseCallback<HttpResponse, List<String>>() {
Future<HttpResponse> response = client.execute(request, new SSEDecoder(), new ResponseCallback<HttpResponse, String>() {
@Override
public void completed(HttpResponse response) {
}
Expand All @@ -53,7 +52,7 @@ public void failed(Throwable e) {
}

@Override
public void contentReceived(List<String> element) {
public void contentReceived(String element) {
System.out.println("Get content from server: " + element);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@
package com.netflix.ribbon.examples;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.http.nio.util.ExpandableBuffer;
import org.apache.http.nio.util.HeapByteBufferAllocator;

import com.google.common.collect.Lists;
import com.netflix.client.StreamDecoder;

/**
Expand All @@ -37,66 +31,34 @@
* @author awang
*
*/
public class SSEDecoder implements StreamDecoder<List<String>, ByteBuffer> {
final ExpandableByteBuffer dataBuffer = new ExpandableByteBuffer();
public class SSEDecoder implements StreamDecoder<String, ByteBuffer> {

@Override
public List<String> decode(ByteBuffer buf) throws IOException {
List<String> result = Lists.newArrayList();
while (buf.position() < buf.limit()) {
byte b = buf.get();
public String decode(ByteBuffer input) throws IOException {
if (input == null || !input.hasRemaining()) {
return null;
}
byte[] buffer = new byte[input.limit()];
boolean foundDelimiter = false;
int index = 0;
int start = input.position();
while (input.remaining() > 0) {
byte b = input.get();
if (b == 10 || b == 13) {
if (dataBuffer.hasContent()) {
result.add(new String(dataBuffer.getBytes(), "UTF-8"));
}
dataBuffer.reset();
foundDelimiter = true;
break;
} else {
dataBuffer.addByte(b);
buffer[index++] = b;
}
}
return result;
}
}

class ExpandableByteBuffer extends ExpandableBuffer {
public ExpandableByteBuffer(int size) {
super(size, HeapByteBufferAllocator.INSTANCE);
}

public ExpandableByteBuffer() {
super(4 * 1024, HeapByteBufferAllocator.INSTANCE);
}

public void addByte(byte b) {
if (this.buffer.remaining() == 0) {
expand();
if (!foundDelimiter) {
// reset the position so that bytes read so far
// will not be lost for next chunk
input.position(start);
return null;
}
this.buffer.put(b);
}

public boolean hasContent() {
return this.buffer.position() > 0;
}

public byte[] getBytes() {
byte[] data = new byte[this.buffer.position()];
this.buffer.position(0);
this.buffer.get(data);
return data;
}

public void reset() {
clear();
}

public void consumeInputStream(InputStream content) throws IOException {
try {
int b = -1;
while ((b = content.read()) != -1) {
addByte((byte) b);
}
} finally {
content.close();
if (index == 0) {
return null;
}
}
return new String(buffer, 0, index, "UTF-8");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.netflix.ribbon.examples;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import rx.util.functions.Action1;
Expand All @@ -42,20 +42,23 @@ public void run() throws Exception {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream").build();
ObservableAsyncClient<HttpRequest, HttpResponse, ByteBuffer> observableClient =
AsyncHttpClientBuilder.withApacheAsyncClient().observableClient();
final AtomicReference<HttpResponse> httpResponse = new AtomicReference<HttpResponse>();
final AtomicReference<HttpResponse> httpResponse = new AtomicReference<HttpResponse>();
final AtomicInteger counter = new AtomicInteger();
try {
observableClient.stream(request, new SSEDecoder())
.toBlockingObservable()
.forEach(new Action1<StreamEvent<HttpResponse, List<String>>>() {
.forEach(new Action1<StreamEvent<HttpResponse, String>>() {
@Override
public void call(final StreamEvent<HttpResponse, List<String>> t1) {
public void call(final StreamEvent<HttpResponse, String> t1) {
System.out.println("Content from server: " + t1.getEvent());
counter.incrementAndGet();
httpResponse.set(t1.getResponse());
}
});
} finally {
httpResponse.get().close();
observableClient.close();
System.out.println("\nTotal event received: " + counter.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public void write(OutputStream output) throws IOException,
for (String line: streamContent) {
String eventLine = line + "\n";
output.write(eventLine.getBytes("UTF-8"));
output.flush();
try {
Thread.sleep(5);
} catch (Exception e) { // NOPMD
Expand Down
Loading

0 comments on commit 9bab116

Please sign in to comment.