diff --git a/ribbon-core/src/main/java/com/netflix/client/AsyncBackupRequestsExecutor.java b/ribbon-core/src/main/java/com/netflix/client/AsyncBackupRequestsExecutor.java index 6de92be6..2fdd48e9 100644 --- a/ribbon-core/src/main/java/com/netflix/client/AsyncBackupRequestsExecutor.java +++ b/ribbon-core/src/main/java/com/netflix/client/AsyncBackupRequestsExecutor.java @@ -134,7 +134,7 @@ public static interface ExecutionResult extends Future { final AtomicBoolean responseRecevied = new AtomicBoolean(); final AtomicBoolean completedCalled = new AtomicBoolean(); final AtomicBoolean failedCalled = new AtomicBoolean(); - final AtomicBoolean cancelledCalled = new AtomicBoolean(); + // final AtomicBoolean cancelledCalled = new AtomicBoolean(); final Lock lock = new ReentrantLock(); final Condition responseChosen = lock.newCondition(); final Multimap> map = ArrayListMultimap.create(); @@ -143,43 +143,65 @@ public static interface ExecutionResult extends Future { final int sequenceNumber = i; Future future = asyncClient.execute(requests.get(i), decoder, new ResponseCallback() { private volatile boolean chosen = false; + private AtomicBoolean cancelledInvokedOnSameRequest = new AtomicBoolean(); @Override public void completed(S response) { - if (completedCalled.compareAndSet(false, true) - && callback != null && chosen) { + // System.err.println("completed called"); + // Thread.dumpStack(); + lock.lock(); + boolean shouldInvokeCallback = false; + try { + if (chosen) { + shouldInvokeCallback = true; + completedCalled.set(true); + } + } finally { + lock.unlock(); + } + if (callback != null && shouldInvokeCallback) { callback.completed(response); } } @Override public void failed(Throwable e) { - int count = failedCount.incrementAndGet(); - if ((count == numServers || chosen) && failedCalled.compareAndSet(false, true)) { - lock.lock(); - try { + lock.lock(); + boolean shouldInvokeCallback = false; + try { + int count = failedCount.incrementAndGet(); + if (count == numServers || chosen) { finalSequenceNumber.set(sequenceNumber); responseChosen.signalAll(); - } finally { - lock.unlock(); - } - if (callback != null) { - callback.failed(e); + shouldInvokeCallback = true; + failedCalled.set(true); } + } finally { + lock.unlock(); + } + if (shouldInvokeCallback && callback != null) { + callback.failed(e); } } @Override public void cancelled() { - int count = failedCount.incrementAndGet(); - if ((count == numServers || chosen) && cancelledCalled.compareAndSet(false, true)) { + // avoid getting cancelled multiple times + if (cancelledInvokedOnSameRequest.compareAndSet(false, true)) { lock.lock(); + int count = failedCount.incrementAndGet(); + boolean shouldInvokeCallback = false; try { - finalSequenceNumber.set(sequenceNumber); - responseChosen.signalAll(); + if (count == numServers || chosen) { + // System.err.println("chosen:" + chosen); + // System.err.println("failed count: " + failedCount.get()); + finalSequenceNumber.set(sequenceNumber); + responseChosen.signalAll(); + shouldInvokeCallback = true; + } } finally { lock.unlock(); } - if (callback != null) { + if (shouldInvokeCallback && callback != null) { callback.cancelled(); } } @@ -188,9 +210,10 @@ public void cancelled() { @Override public void responseReceived(S response) { if (responseRecevied.compareAndSet(false, true)) { - chosen = true; + // System.err.println("chosen=true"); lock.lock(); try { + chosen = true; finalSequenceNumber.set(sequenceNumber); responseChosen.signalAll(); } finally { diff --git a/ribbon-core/src/main/java/com/netflix/client/AsyncLoadBalancingClient.java b/ribbon-core/src/main/java/com/netflix/client/AsyncLoadBalancingClient.java index b50464de..7a55ca49 100644 --- a/ribbon-core/src/main/java/com/netflix/client/AsyncLoadBalancingClient.java +++ b/ribbon-core/src/main/java/com/netflix/client/AsyncLoadBalancingClient.java @@ -18,7 +18,6 @@ package com.netflix.client; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; diff --git a/ribbon-core/src/main/java/com/netflix/client/StreamDecoder.java b/ribbon-core/src/main/java/com/netflix/client/StreamDecoder.java index bbfe5e03..7da9cd40 100644 --- a/ribbon-core/src/main/java/com/netflix/client/StreamDecoder.java +++ b/ribbon-core/src/main/java/com/netflix/client/StreamDecoder.java @@ -29,5 +29,16 @@ * @param Type of storage used for partial content. For example, {@link ByteBuffer}. */ public interface StreamDecoder { + /** + * Decode from the input and create an entity. The client implementation should call this method in + * a loop until it returns null, which means no more stream entity can be created from the unconsumed input. + * If there is any unconsumed input, client implementation should buffer and use it in conjunction with + * the next available input, for example, an HTTP chunk. In other words, the decoder should not + * have to buffer unconsumed input. + * + * @param input input to read and create entity from + * @return Entity created, or null if nothing can be created from the unconsumed input + * @throws IOException + */ T decode(S input) throws IOException; } diff --git a/ribbon-core/src/main/java/com/netflix/serialization/StreamDecoder.java b/ribbon-core/src/main/java/com/netflix/serialization/StreamDecoder.java deleted file mode 100644 index 0abc01ab..00000000 --- a/ribbon-core/src/main/java/com/netflix/serialization/StreamDecoder.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package com.netflix.serialization; - -public interface StreamDecoder { - S decode(T input); -} diff --git a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/AsyncStreamingClientApp.java b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/AsyncStreamingClientApp.java index fa68cf5d..edcc1818 100644 --- a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/AsyncStreamingClientApp.java +++ b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/AsyncStreamingClientApp.java @@ -18,7 +18,6 @@ package com.netflix.ribbon.examples; import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.Future; import com.netflix.client.AsyncClient; @@ -42,7 +41,7 @@ public void run() throws Exception { HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream").build(); AsyncHttpClient client = AsyncHttpClientBuilder.withApacheAsyncClient().buildClient(); try { - Future response = client.execute(request, new SSEDecoder(), new ResponseCallback>() { + Future response = client.execute(request, new SSEDecoder(), new ResponseCallback() { @Override public void completed(HttpResponse response) { } @@ -53,7 +52,7 @@ public void failed(Throwable e) { } @Override - public void contentReceived(List element) { + public void contentReceived(String element) { System.out.println("Get content from server: " + element); } diff --git a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/SSEDecoder.java b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/SSEDecoder.java index 083995ae..d1056792 100644 --- a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/SSEDecoder.java +++ b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/SSEDecoder.java @@ -18,14 +18,8 @@ package com.netflix.ribbon.examples; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.List; -import org.apache.http.nio.util.ExpandableBuffer; -import org.apache.http.nio.util.HeapByteBufferAllocator; - -import com.google.common.collect.Lists; import com.netflix.client.StreamDecoder; /** @@ -37,66 +31,34 @@ * @author awang * */ -public class SSEDecoder implements StreamDecoder, ByteBuffer> { - final ExpandableByteBuffer dataBuffer = new ExpandableByteBuffer(); +public class SSEDecoder implements StreamDecoder { - @Override - public List decode(ByteBuffer buf) throws IOException { - List result = Lists.newArrayList(); - while (buf.position() < buf.limit()) { - byte b = buf.get(); + public String decode(ByteBuffer input) throws IOException { + if (input == null || !input.hasRemaining()) { + return null; + } + byte[] buffer = new byte[input.limit()]; + boolean foundDelimiter = false; + int index = 0; + int start = input.position(); + while (input.remaining() > 0) { + byte b = input.get(); if (b == 10 || b == 13) { - if (dataBuffer.hasContent()) { - result.add(new String(dataBuffer.getBytes(), "UTF-8")); - } - dataBuffer.reset(); + foundDelimiter = true; + break; } else { - dataBuffer.addByte(b); + buffer[index++] = b; } } - return result; - } -} - -class ExpandableByteBuffer extends ExpandableBuffer { - public ExpandableByteBuffer(int size) { - super(size, HeapByteBufferAllocator.INSTANCE); - } - - public ExpandableByteBuffer() { - super(4 * 1024, HeapByteBufferAllocator.INSTANCE); - } - - public void addByte(byte b) { - if (this.buffer.remaining() == 0) { - expand(); + if (!foundDelimiter) { + // reset the position so that bytes read so far + // will not be lost for next chunk + input.position(start); + return null; } - this.buffer.put(b); - } - - public boolean hasContent() { - return this.buffer.position() > 0; - } - - public byte[] getBytes() { - byte[] data = new byte[this.buffer.position()]; - this.buffer.position(0); - this.buffer.get(data); - return data; - } - - public void reset() { - clear(); - } - - public void consumeInputStream(InputStream content) throws IOException { - try { - int b = -1; - while ((b = content.read()) != -1) { - addByte((byte) b); - } - } finally { - content.close(); + if (index == 0) { + return null; } - } + return new String(buffer, 0, index, "UTF-8"); + } } diff --git a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/StreamingObservableExample.java b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/StreamingObservableExample.java index 563d92b8..3e563ed0 100644 --- a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/StreamingObservableExample.java +++ b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/StreamingObservableExample.java @@ -18,7 +18,7 @@ package com.netflix.ribbon.examples; import java.nio.ByteBuffer; -import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import rx.util.functions.Action1; @@ -42,20 +42,23 @@ public void run() throws Exception { HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testAsync/stream").build(); ObservableAsyncClient observableClient = AsyncHttpClientBuilder.withApacheAsyncClient().observableClient(); - final AtomicReference httpResponse = new AtomicReference(); + final AtomicReference httpResponse = new AtomicReference(); + final AtomicInteger counter = new AtomicInteger(); try { observableClient.stream(request, new SSEDecoder()) .toBlockingObservable() - .forEach(new Action1>>() { + .forEach(new Action1>() { @Override - public void call(final StreamEvent> t1) { + public void call(final StreamEvent t1) { System.out.println("Content from server: " + t1.getEvent()); + counter.incrementAndGet(); httpResponse.set(t1.getResponse()); } }); } finally { httpResponse.get().close(); observableClient.close(); + System.out.println("\nTotal event received: " + counter.get()); } } diff --git a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/server/ServerResources.java b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/server/ServerResources.java index ed8e5ccc..545ff9cb 100644 --- a/ribbon-examples/src/main/java/com/netflix/ribbon/examples/server/ServerResources.java +++ b/ribbon-examples/src/main/java/com/netflix/ribbon/examples/server/ServerResources.java @@ -158,7 +158,6 @@ public void write(OutputStream output) throws IOException, for (String line: streamContent) { String eventLine = line + "\n"; output.write(eventLine.getBytes("UTF-8")); - output.flush(); try { Thread.sleep(5); } catch (Exception e) { // NOPMD diff --git a/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/ExpandableByteBuffer.java b/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/ExpandableByteBuffer.java new file mode 100644 index 00000000..052f6270 --- /dev/null +++ b/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/ExpandableByteBuffer.java @@ -0,0 +1,83 @@ +/* + * + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.netflix.httpasyncclient; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.http.nio.util.ExpandableBuffer; +import org.apache.http.nio.util.HeapByteBufferAllocator; + +import com.netflix.client.StreamDecoder; + +/** + * A data buffer used internally to buffer data that is not consumed by {@link StreamDecoder} so + * that it can be used in conjunction with data from the next HTTP chunk. + *

+ * This code is copied from https://github.com/Netflix/RxJava/tree/master/rxjava-contrib/rxjava-apache-http + * + * @author awang + * + */ +public class ExpandableByteBuffer extends ExpandableBuffer { + public ExpandableByteBuffer(int size) { + super(size, HeapByteBufferAllocator.INSTANCE); + } + + public ExpandableByteBuffer() { + super(4 * 1024, HeapByteBufferAllocator.INSTANCE); + } + + public void addByte(byte b) { + if (this.buffer.remaining() == 0) { + expand(); + } + this.buffer.put(b); + } + + public boolean hasContent() { + return this.buffer.position() > 0; + } + + ByteBuffer getByteBuffer() { + return this.buffer; + } + + public byte[] getBytes() { + byte[] data = new byte[this.buffer.position()]; + this.buffer.position(0); + this.buffer.get(data); + return data; + } + + public void reset() { + clear(); + } + + public void consumeInputStream(InputStream content) throws IOException { + try { + int b = -1; + while ((b = content.read()) != -1) { + addByte((byte) b); + } + } finally { + content.close(); + } + } +} diff --git a/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/RibbonHttpAsyncClient.java b/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/RibbonHttpAsyncClient.java index 398d4ff5..1498bb28 100644 --- a/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/RibbonHttpAsyncClient.java +++ b/ribbon-httpasyncclient/src/main/java/com/netflix/httpasyncclient/RibbonHttpAsyncClient.java @@ -192,16 +192,45 @@ public Future execute( AbstractAsyncResponseConsumer consumer = null; if (decoder != null) { consumer = new AsyncByteConsumer() { - private volatile HttpResponse response; + private volatile HttpResponse response; + ExpandableByteBuffer buffer = new ExpandableByteBuffer(); @Override protected void onByteReceived(ByteBuffer buf, IOControl ioctrl) throws IOException { - E obj = decoder.decode(buf); - if (obj != null && callback != null) { - callback.contentReceived(obj); + ByteBuffer bufferToConsume = buf; + if (buffer.hasContent()) { + logger.debug("internal buffer has unconsumed content"); + while (buf.position() < buf.limit()) { + byte b = buf.get(); + buffer.addByte(b); + } + bufferToConsume = buffer.getByteBuffer(); + bufferToConsume.flip(); + } + while (true) { + E obj = decoder.decode(bufferToConsume); + if (obj != null) { + if (callback != null) { + callback.contentReceived(obj); + } + } else { + break; + } } + if (bufferToConsume.position() < bufferToConsume.limit()) { + // there are leftovers + logger.debug("copying leftover bytes not consumed by decoder to internal buffer for future use"); + // discard bytes already consumed and copy over bytes not consumed to the new buffer + buffer = new ExpandableByteBuffer(); + while (bufferToConsume.position() < bufferToConsume.limit()) { + byte b = bufferToConsume.get(); + buffer.addByte(b); + } + } else if (bufferToConsume == buffer.getByteBuffer()) { + buffer.reset(); + } } - + @Override protected void onResponseReceived(HttpResponse response) throws HttpException, IOException { diff --git a/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/EmbeddedResources.java b/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/EmbeddedResources.java index fcc789f0..887a755f 100644 --- a/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/EmbeddedResources.java +++ b/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/EmbeddedResources.java @@ -49,7 +49,7 @@ public class EmbeddedResources { static List streamContent = Lists.newArrayList(); static { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 2000; i++) { streamContent.add("data: line " + i); } } @@ -102,10 +102,6 @@ public void write(OutputStream output) throws IOException, for (String line: streamContent) { String eventLine = line + "\n"; output.write(eventLine.getBytes("UTF-8")); - try { - Thread.sleep(10); - } catch (Exception e) { // NOPMD - } } } }; diff --git a/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClientTest.java b/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClientTest.java index de5339c3..60ef5ae0 100644 --- a/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClientTest.java +++ b/ribbon-httpasyncclient/src/test/java/com/netflix/httpasyncclient/HttpAsyncClientTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; @@ -37,8 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.http.nio.util.ExpandableBuffer; -import org.apache.http.nio.util.HeapByteBufferAllocator; import org.junit.BeforeClass; import org.junit.Test; @@ -81,68 +78,38 @@ public class HttpAsyncClientTest { private RibbonHttpAsyncClient client = new RibbonHttpAsyncClient(); private static int port; - static class ExpandableByteBuffer extends ExpandableBuffer { - public ExpandableByteBuffer(int size) { - super(size, HeapByteBufferAllocator.INSTANCE); - } - - public ExpandableByteBuffer() { - super(4 * 1024, HeapByteBufferAllocator.INSTANCE); - } - - public void addByte(byte b) { - if (this.buffer.remaining() == 0) { - expand(); - } - this.buffer.put(b); - } - - public boolean hasContent() { - return this.buffer.position() > 0; - } - - public byte[] getBytes() { - byte[] data = new byte[this.buffer.position()]; - this.buffer.position(0); - this.buffer.get(data); - return data; - } - - public void reset() { - clear(); - } - - public void consumeInputStream(InputStream content) throws IOException { - try { - int b = -1; - while ((b = content.read()) != -1) { - addByte((byte) b); - } - } finally { - content.close(); - } - } - } - static class SSEDecoder implements StreamDecoder, ByteBuffer> { - final ExpandableByteBuffer dataBuffer = new ExpandableByteBuffer(); - @Override - public List decode(ByteBuffer buf) throws IOException { + public List decode(ByteBuffer input) throws IOException { List result = Lists.newArrayList(); - while (buf.position() < buf.limit()) { - byte b = buf.get(); + if (input == null || !input.hasRemaining()) { + return null; + } + byte[] buffer = new byte[input.limit()]; + boolean foundDelimiter = false; + int index = 0; + int start = input.position(); + while (input.remaining() > 0) { + byte b = input.get(); if (b == 10 || b == 13) { - if (dataBuffer.hasContent()) { - result.add(new String(dataBuffer.getBytes(), "UTF-8")); - } - dataBuffer.reset(); + foundDelimiter = true; + break; } else { - dataBuffer.addByte(b); + buffer[index++] = b; } } + if (!foundDelimiter) { + // reset the position so that bytes read so far + // will not be lost for next chunk + input.position(start); + return null; + } + if (index == 0) { + return null; + } + result.add(new String(buffer, 0, index, "UTF-8")); return result; - } + } }