diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java b/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java index 90140e3f5f2..d045df336b4 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java @@ -50,8 +50,17 @@ static BiConsumer combine( return ((BiConsumerChain) current).combineWith(another); } BiConsumerChain newChain = new BiConsumerChain<>(); - newChain.add(current); - newChain.add(another); + if (current instanceof BiConsumerChain) { + newChain.addAll((BiConsumerChain) current); + } else { + newChain.add(current); + } + + if (another instanceof BiConsumerChain) { + newChain.addAll((BiConsumerChain) another); + } else { + newChain.add(another); + } return newChain; } } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java index 1f215a7e7a1..540859fe570 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java @@ -16,12 +16,10 @@ package io.helidon.common.reactive; -import java.util.Objects; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -36,16 +34,31 @@ */ public class BufferedEmittingPublisher implements Flow.Publisher { - private final AtomicReference state = new AtomicReference<>(State.READY_TO_EMIT); private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); - private final EmittingPublisher emitter = new EmittingPublisher<>(); - private final AtomicLong deferredDrains = new AtomicLong(0); - private final AtomicBoolean draining = new AtomicBoolean(false); - private final AtomicBoolean subscribed = new AtomicBoolean(false); - private final AtomicReference error = new AtomicReference<>(); + private volatile Throwable error; private BiConsumer requestCallback = null; private Consumer onEmitCallback = null; - private boolean safeToSkipBuffer = false; + private Consumer onCleanup = null; + private Consumer onAbort = null; + private volatile Flow.Subscriber subscriber; + // state: two bits, b1 b0, tell: + // b0: 0/1 is not started/started (a subscriber arrived) + // b1: 0/1 is not stopped/stopped (a publisher completed) + // You can start and stop asynchronously and in any order + private final AtomicInteger state = new AtomicInteger(); + // assert: contenders is initially non-zero, so nothing can be done until onSubscribe has + // been signalled; observe drain() after onSubscribe + private final AtomicInteger contenders = new AtomicInteger(1); + private final AtomicLong requested = new AtomicLong(); + // assert: ignorePending is set to enter terminal state as soon as possible: behave like + // the buffer is empty + private volatile boolean ignorePending; + + // assert: emitted is accessed single-threadedly + private long emitted; + // assert: observing cancelled, but not ignorePending, is possible only if cancel() races + // against a completion (isCancelled() and isComplete() are both true) + private boolean cancelled; protected BufferedEmittingPublisher() { } @@ -61,26 +74,39 @@ public static BufferedEmittingPublisher create() { } @Override - public void subscribe(final Flow.Subscriber subscriber) { - Objects.requireNonNull(subscriber, "subscriber is null"); - - if (!subscribed.compareAndSet(false, true)) { - subscriber.onSubscribe(SubscriptionHelper.CANCELED); - subscriber.onError(new IllegalStateException("Only single subscriber is allowed!")); + public void subscribe(final Flow.Subscriber sub) { + if (stateChange(1)) { + MultiError.create(new IllegalStateException("Only single subscriber is allowed!")) + .subscribe(sub); return; } - emitter.onSubscribe(() -> state.get().drain(this)); - emitter.onRequest((n, cnt) -> { - if (requestCallback != null) { - requestCallback.accept(n, cnt); + sub.onSubscribe(new Flow.Subscription() { + public void request(long n) { + if (n < 1) { + abort(new IllegalArgumentException("Expected request() with a positive increment")); + return; + } + long curr; + do { + curr = requested.get(); + } while (curr != Long.MAX_VALUE + && !requested.compareAndSet(curr, Long.MAX_VALUE - curr > n ? curr + n : Long.MAX_VALUE)); + if (requestCallback != null) { + requestCallback.accept(n, curr); + } + maybeDrain(); } - state.get().drain(this); - }); - emitter.onCancel(() -> state.compareAndSet(State.READY_TO_EMIT, State.CANCELLED)); - // subscriber is already validated - emitter.unsafeSubscribe(subscriber); + public void cancel() { + cancelled = true; + ignorePending = true; + maybeDrain(); + abort(null); + } + }); + subscriber = sub; + drain(); // assert: contenders lock is already acquired } /** @@ -94,11 +120,7 @@ public void subscribe(final Flow.Subscriber subscriber) { * @param requestCallback to be executed */ public void onRequest(BiConsumer requestCallback) { - if (this.requestCallback == null) { - this.requestCallback = requestCallback; - } else { - this.requestCallback = BiConsumerChain.combine(this.requestCallback, requestCallback); - } + this.requestCallback = BiConsumerChain.combine(this.requestCallback, requestCallback); } /** @@ -110,71 +132,188 @@ public void onRequest(BiConsumer requestCallback) { * @param onEmitCallback to be executed */ public void onEmit(Consumer onEmitCallback) { - if (this.onEmitCallback == null) { - this.onEmitCallback = onEmitCallback; - } else { - this.onEmitCallback = ConsumerChain.combine(this.onEmitCallback, onEmitCallback); + this.onEmitCallback = ConsumerChain.combine(this.onEmitCallback, onEmitCallback); + } + + /** + * Callback executed to clean up the buffer, when the Publisher terminates without passing + * ownership of buffered items to anyone (fail, completeNow, or the Subscription is cancelled). + *

+ * Use case: items buffered require handling their lifecycle, like releasing resources, or + * returning to a pool. + *

+ * Calling onCleanup multiple times will ensure that each of the provided Consumers gets a + * chance to look at the items in the buffer. Usually you do not want to release the same + * resource to a pool more than once, so you should usually want to ensure you pass one and + * only one callback to onCleanup. For this reason, do not use together with clearBuffer, + * unless you know how to have idempotent resource lifecycle management. + * + * @param onCleanup callback executed to clean up the buffer + */ + public void onCleanup(Consumer onCleanup) { + this.onCleanup = ConsumerChain.combine(this.onCleanup, onCleanup); + } + + /** + * Callback executed when this Publisher fails or is cancelled in a way that the entity performing + * emit() may be unaware of. + *

+ * Use case: emit() is issued only if onRequest is received; these will cease upon a failed request + * or when downstream requests cancellation. onAbort is going to let the entity issuing emit() + * know that no more onRequest are forthcoming (albeit they may still happen, the items emitted + * after onAbort will likely be discarded, and not emitted items will not be missed). + *

+ * In essence the pair of onRequest and onAbort make up the interface like that of a Processor's + * Subscription's request and cancel. The difference is only the API and the promise: we allow + * emit() to not heed backpressure (for example, when upstream is really unable to heed + * backpressure without introducing a buffer of its own, like is the case with many transformations + * of the form Publisher<T>->Publisher<Publisher<T>>). + *

+ * In the same vein there really is no restriction as to when onAbort callback can be called - there + * is no requirement for this Publisher to establish exactly whether the entity performing emit() + * is aware of the abort (say, a fail), or not. It is only required to ensure that the failures it + * generates (and not merely forwards to downstream) and cancellations it received, get propagated + * to the callback. + * + * @param onAbort callback executed when this Publisher fails or is cancelled + */ + public void onAbort(Consumer onAbort) { + this.onAbort = ConsumerChain.combine(this.onAbort, onAbort); + } + + private void abort(Throwable th) { + if (th != null) { + fail(th); + } + if (onAbort != null) { + onAbort.accept(th); } } /** * Emit item to the stream, if there is no immediate demand from downstream, * buffer item for sending when demand is signaled. + * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) * * @param item to be emitted - * @return actual size of the buffer, value should be used as informative and can change asynchronously - * @throws IllegalStateException if cancelled, completed of failed */ - public int emit(final T item) { - return state.get().emit(this, item); + public void emit(final T item) { + boolean locked = false; + int s = state.get(); + if (s == 1) { + // assert: attempt fast path only if started, and not stopped + locked = contenders.get() == 0 && contenders.compareAndSet(0, 1); + } + + // assert: this condition is the same as the loop on slow path in drain(), except the buffer + // isEmpty - the condition when we can skip adding, and immediately removing the item + // from the buffer, without loss of FIFO order. + if (locked && !ignorePending && requested.get() > emitted && buffer.isEmpty()) { + try { + subscriber.onNext(item); + if (onEmitCallback != null) { + onEmitCallback.accept(item); + } + emitted++; + } catch (RuntimeException re) { + // assert: fail is re-entrant (will succeed even while the contenders lock has been acquired) + abort(re); + } finally { + drain(); + } + return; + } + + // assert: if ignorePending, buffer cleanup will happen in the future + buffer.add(item); + if (locked) { + drain(); + } else { + maybeDrain(); + } } /** * Send {@code onError} signal downstream, regardless of the buffer content. * Nothing else can be sent downstream after calling fail. - * {@link BufferedEmittingPublisher#emit(Object)} throws {@link IllegalStateException} after calling fail. + * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) + *

+ * If several fail are invoked in quick succession or concurrently, no guarantee + * which of them ends up sent to downstream. * * @param throwable Throwable to be sent downstream as onError signal. */ public void fail(Throwable throwable) { - error.set(throwable); - if (state.compareAndSet(State.READY_TO_EMIT, State.FAILED)) { - emitter.fail(throwable); - } + // assert: delivering a completion signal discarding the whole buffer takes precedence over normal + // completion - that is, if complete() has been called, but onComplete has not been delivered + // yet, onError will be signalled instead, discarding the entire buffer. + // Otherwise the downstream may not be able to establish orderly processing: fail() can be + // forced as part of a borken request(), failed onNext, onRequest or onEmit callbacks. These + // indicate the conditions where downstream may not reach a successful request() or cancel, + // thus blocking the progress of the Publisher. + error = throwable; + completeNow(); } /** - * Drain the buffer, in case of not sufficient demands wait for more requests, - * then send {@code onComplete} signal to downstream. - * {@link BufferedEmittingPublisher#emit(Object)} throws {@link IllegalStateException} after calling complete. + * Send onComplete to downstream after it consumes the entire buffer. Intervening fail invocations + * can end up sending onError instead of onComplete. + * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) */ public void complete() { - if (state.compareAndSet(State.READY_TO_EMIT, State.COMPLETING)) { - //drain buffer then complete - State.READY_TO_EMIT.drain(this); + // assert: transition the state to stopped, and see if it is started; if not started, maybeDrain is futile + // assert: if cancelled can be observed, let's not race against it to change the state - let the state + // remain cancelled; this does not preclude the possibility of isCancelled switching to false, just makes + // it a little more predictable in single-threaded cases + // assert: even if cancelled, enter maybeDrain to ensure the cleanup occurs (complete is entrant from + // completeNow and fail) + if (cancelled || stateChange(2)) { + maybeDrain(); } } + private boolean stateChange(int s) { + int curr; + do { + curr = state.get(); + } while ((curr & s) != s && !state.compareAndSet(curr, curr + s)); + return (curr & 1) > 0; + } + /** * Send {@code onComplete} signal downstream immediately, regardless of the buffer content. * Nothing else can be sent downstream after calling {@link BufferedEmittingPublisher#completeNow()}. - * {@link BufferedEmittingPublisher#emit(Object)} throws {@link IllegalStateException} after calling completeNow. + * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) */ public void completeNow() { - if (state.compareAndSet(State.READY_TO_EMIT, State.COMPLETED)) { - emitter.complete(); - } + ignorePending = true; + complete(); } /** * Clear whole buffer, invoke consumer for each item before discarding it. + * Use case: items in the buffer require discarding properly, freeing up some resources, or returning them + * to a pool. + *

+ * It is the caller's responsibility to ensure there are no concurrent invocations of clearBuffer, and + * that there will be no emit calls in the future, as the items processed by those invocations may not be + * consumed properly. + *

+ * It is recommended that onCleanup is set up instead of using clearBuffer. Do not use together with onCleanup. * * @param consumer to be invoked for each item */ public void clearBuffer(Consumer consumer) { - while (!buffer.isEmpty()) { - consumer.accept(buffer.poll()); - } + // I recommend deprecating this method altogether + + // Accessing buffer concurrently with drain() is inherently broken: everyone assumes that if buffer + // is not empty, then buffer.poll() returns non-null value (this promise is broken), and everyone + // assumes that polling buffer returns items in FIFO order (this promise is broken). + //while (!buffer.isEmpty()) { + // consumer.accept(buffer.poll()); + //} + onCleanup(consumer); + completeNow(); // this is the current behaviour } /** @@ -183,7 +322,7 @@ public void clearBuffer(Consumer consumer) { * @return true if so */ public boolean isUnbounded() { - return this.emitter.isUnbounded(); + return requested.get() == Long.MAX_VALUE; } /** @@ -193,28 +332,54 @@ public boolean isUnbounded() { * @return true if demand is higher than 0 */ public boolean hasRequests() { - return this.emitter.hasRequests(); + return requested.get() > emitted; } /** * Check if publisher sent {@code onComplete} signal downstream. * Returns {@code true} right after calling {@link BufferedEmittingPublisher#completeNow()} + * (with a caveat) * but after calling {@link BufferedEmittingPublisher#complete()} returns * {@code false} until whole buffer has been drained. + *

+ * The caveat is that completeNow() does not guarantee that the onComplete signal is sent + * before returning from completeNow() - it is only guaranteed to be sent as soon as it can be done. * * @return true if so */ public boolean isCompleted() { - return this.state.get() == State.COMPLETED; + // The caveat above means only that the current implementation guarantees onComplete is sent + // before completeNow returns in single-threaded cases. When concurrent emit() or request() + // race against completeNow, completeNow may return without entering drain() - but the concurrent + // calls guarantee onComplete will be called as soon as they observe the buffer is empty. + // + // We don't want to say this in the public documentation as this is implementation detail. + // + // A subtle logical statement: if onError or onComplete has been signalled to downstream, + // isCompleted is true. But it is also true if cancel() precluded sending the signal + // to downstream. + // + // The current implementation is: isCompleted is true if and only if no more onNext signals + // will be sent to downstream and no cancellation was requested. + // + // assert: once isCompleted becomes true, it stays true + // question: what should it be, if complete() was called, but not onSubscribe()? + return state.get() > 1 && buffer.isEmpty(); } /** * Check if publisher is in terminal state CANCELLED. + *

+ * It is for information only. It is not guaranteed to tell what happened to downstream, if there + * were a concurrent cancellation and a completion. * * @return true if so */ public boolean isCancelled() { - return this.state.get() == State.CANCELLED; + // a stricter logic can be implemented, but is the complication warranted? + + // assert: once isCancelled becomes true, isCancelled || isCompleted stays true + return ignorePending && cancelled && !isCompleted(); } /** @@ -227,154 +392,96 @@ public int bufferSize() { return buffer.size(); } - private void drainBuffer() { - deferredDrains.incrementAndGet(); - - long drains; - do { - if (draining.getAndSet(true)) { - //other thread already draining - return; - } - drains = deferredDrains.getAndUpdate(d -> d == 0 ? 0 : d - 1); - if (drains > 0) { - // in case of parallel drains invoked by request - // increasing demand during draining - actualDrain(); - drains--; + /** + * Override, if you prefer to do cleanup in a uniform way, instead of requiring everyone + * to register a onCleanup. + *

+ * Use case: a subclass that offers an implementation of BufferedEmittingPublisher<T> for + * a certain type of resource T. + */ + protected void cleanup() { + if (onCleanup == null) { + buffer.clear(); + } else { + while (!buffer.isEmpty()) { + onCleanup.accept(buffer.poll()); } - draining.set(false); - // changed while draining, try again - } while (drains < deferredDrains.get()); + } } - private void actualDrain() { - while (!buffer.isEmpty()) { - if (emitter.emit(buffer.peek())) { - if (onEmitCallback != null) { - onEmitCallback.accept(buffer.poll()); - } else { - buffer.poll(); - } - } else { - break; - } - } - if (buffer.isEmpty() - && state.compareAndSet(State.COMPLETING, State.COMPLETED)) { - // Buffer drained, time for complete - emitter.complete(); + private void maybeDrain() { + // assert: if not started, will not post too many emit() and complete() to overflow the + // counter + if (contenders.getAndIncrement() == 0) { + drain(); } } - private int emitOrBuffer(T item) { - synchronized (this) { + // Key design principles: + // - all operations on downstream are executed whilst "holding the lock". + // The lock acquisition is the ability to transition the value of contenders from zero to 1. + // - any changes to state are followed by maybeDrain, so the thread inside drain() can notice + // that some state change has occurred: + // - ignorePending + // - error + // - cancelled + // - requested + // - buffer contents + private void drain() { + IllegalStateException ise = null; + for (int cont = 1; cont > 0; cont = contenders.addAndGet(-cont)) { + boolean terminateNow = ignorePending; try { - if (buffer.isEmpty() && emitter.emit(item)) { - // Buffer drained, emit successful - // saved time by skipping buffer + while (!terminateNow && requested.get() > emitted && !buffer.isEmpty()) { + T item = buffer.poll(); + subscriber.onNext(item); if (onEmitCallback != null) { onEmitCallback.accept(item); } - return 0; - } else { - // safe slower path thru buffer - buffer.add(item); - state.get().drain(this); - return buffer.size(); - } - } finally { - // If unbounded, check only once if buffer is empty - if (!safeToSkipBuffer && isUnbounded() && buffer.isEmpty()) { - safeToSkipBuffer = true; + emitted++; + terminateNow = ignorePending; } + } catch (RuntimeException re) { + abort(re); } - } - } - private int unboundedEmitOrBuffer(T item) { - // Not reachable unless unbounded req was made - // and buffer is empty - if (emitter.emit(item)) { - // Emit successful - if (onEmitCallback != null) { - onEmitCallback.accept(item); + if (terminateNow) { + cleanup(); } - return 0; - } else { - // Emitter can be only in terminal state - // buffer for later retrieval by clearBuffer() - buffer.add(item); - return buffer.size(); - } - } - - private enum State { - READY_TO_EMIT { - @Override - int emit(BufferedEmittingPublisher publisher, T item) { - if (publisher.safeToSkipBuffer) { - return publisher.unboundedEmitOrBuffer(item); + if (terminateNow || isCompleted()) { + try { + // assert: cleanup in finally + if (!cancelled) { + cancelled = true; + ignorePending = true; + if (error != null) { + subscriber.onError(error); + } else { + subscriber.onComplete(); + } + } + } catch (Throwable th) { + // assert: catch all throwables, to ensure the lock is released properly + // and buffer cleanup remains reachable + // assert: this line is reachable only once: all subsequent iterations + // will observe cancelled == true + ise = new IllegalStateException(th); + } finally { + error = null; + subscriber = null; + requestCallback = null; + onEmitCallback = null; } - return publisher.emitOrBuffer(item); - } - - @Override - void drain(final BufferedEmittingPublisher publisher) { - publisher.drainBuffer(); - } - }, - CANCELLED { - @Override - int emit(BufferedEmittingPublisher publisher, T item) { - throw new IllegalStateException("Emitter is cancelled!"); - } - - @Override - void drain(final BufferedEmittingPublisher publisher) { - //noop - } - }, - FAILED { - @Override - int emit(BufferedEmittingPublisher publisher, T item) { - throw new IllegalStateException("Emitter is in failed state!"); - } - - @Override - void drain(final BufferedEmittingPublisher publisher) { - //Can't happen twice, internal emitter keeps the state too - publisher.emitter.fail(publisher.error.get()); - } - }, - COMPLETING { - @Override - int emit(BufferedEmittingPublisher publisher, T item) { - throw new IllegalStateException("Emitter is completing!"); } + } - @Override - void drain(final BufferedEmittingPublisher publisher) { - State.READY_TO_EMIT.drain(publisher); - } - }, - COMPLETED { - @Override - int emit(BufferedEmittingPublisher publisher, T item) { - throw new IllegalStateException("Emitter is completed!"); - } - - @Override - void drain(final BufferedEmittingPublisher publisher) { - //Can't happen twice, internal emitter keeps the state too - publisher.emitter.complete(); - } - }; - - abstract int emit(BufferedEmittingPublisher publisher, T item); - - abstract void drain(BufferedEmittingPublisher publisher); - + if (ise != null) { + // assert: this violates the reactive spec, but this is what the tests expect. + // Observe that there is no guarantee where the exception will be thrown - + // it may happen during request(), which is expected to finish without + // throwing + throw ise; + } } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java index 70913e8ccf8..33c15610605 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java @@ -27,17 +27,16 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; -import org.junit.jupiter.api.Test; - /** * The BufferedEmittingPublisherTest. */ @@ -168,7 +167,7 @@ public void onError(Throwable throwable) { try { publisher.fail(new IllegalStateException("foo!")); fail("an exception should have been thrown"); - } catch(IllegalStateException ex) { + } catch (IllegalStateException ex) { assertThat(ex.getCause(), is(not(nullValue()))); assertThat(ex.getCause(), is(instanceOf(UnsupportedOperationException.class))); } @@ -199,8 +198,7 @@ public void onNext(Long item) { publisher.emit(15L); assertThat(subscriber.isComplete(), is(equalTo(false))); assertThat(subscriber.getLastError(), is(not(nullValue()))); - assertThat(subscriber.getLastError(), is(instanceOf(IllegalStateException.class))); - assertThat(subscriber.getLastError().getCause(), is(instanceOf(UnsupportedOperationException.class))); + assertThat(subscriber.getLastError(), is(instanceOf(UnsupportedOperationException.class))); } @Test @@ -238,7 +236,31 @@ public void onSubscribe(Subscription subscription) { } }; publisher.subscribe(subscriber); - assertThrows(IllegalStateException.class, () -> publisher.emit(0L)); + publisher.emit(0L); + assertThat(publisher.bufferSize(), is(equalTo(0))); + assertThat(publisher.isCancelled(), is(equalTo(true))); + } + + @Test + void concurrentSubscribe() { + AtomicInteger cnt = new AtomicInteger(); + ExecutorService exec = Executors.newFixedThreadPool(5); + try { + for (int i = 0; i < 5_000_000; i++) { + cnt.set(0); + BufferedEmittingPublisher bep = new BufferedEmittingPublisher<>(); + exec.submit(() -> { + bep.emit(1); + bep.complete(); + }); + Multi.create(bep) + .forEach(integer -> cnt.incrementAndGet()) + .await(); + assertThat(cnt.get(), is(equalTo(1))); + } + } finally { + exec.shutdown(); + } } @Test @@ -248,12 +270,12 @@ void flatMapping() { ExecutorService exec = Executors.newFixedThreadPool(32); Single promise = Multi.range(0, STREAM_SIZE) .flatMap(it -> { - BufferedEmittingPublisher flatMapped = new BufferedEmittingPublisher<>(); + BufferedEmittingPublisher bep = new BufferedEmittingPublisher<>(); exec.submit(() -> { - flatMapped.emit(it); - flatMapped.complete(); + bep.emit(it); + bep.complete(); }); - return flatMapped; + return bep; }) .forEach(unused -> cnt.incrementAndGet()); diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java index bbe75035479..2b29a53f642 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java @@ -39,10 +39,14 @@ void testBackPressureWithCompleteNow() { TestSubscriber subscriber = new TestSubscriber<>(); emitter.subscribe(subscriber); - assertBufferSize(emitter.emit(0), 1); - assertBufferSize(emitter.emit(1), 2); - assertBufferSize(emitter.emit(2), 3); - assertBufferSize(emitter.emit(3), 4); + emitter.emit(0); + assertBufferSize(emitter.bufferSize(), 1); + emitter.emit(1); + assertBufferSize(emitter.bufferSize(), 2); + emitter.emit(2); + assertBufferSize(emitter.bufferSize(), 3); + emitter.emit(3); + assertBufferSize(emitter.bufferSize(), 4); subscriber .assertEmpty() @@ -52,10 +56,12 @@ void testBackPressureWithCompleteNow() { .assertItemCount(3) .assertNotTerminated(); - assertBufferSize(emitter.emit(4), 2); + emitter.emit(4); + assertBufferSize(emitter.bufferSize(), 2); emitter.completeNow(); + assertBufferSize(emitter.bufferSize(), 0); subscriber.requestMax() .assertValues(0, 1, 2) .assertComplete(); @@ -166,13 +172,15 @@ void testBackPressureWithLazyComplete() { .assertItemCount(3) .assertNotTerminated(); - assertThat(emitter.emit(10), is(equalTo(8))); + emitter.emit(10); + assertThat(emitter.bufferSize(), is(equalTo(8))); subscriber .request(3) .assertItemCount(6); - assertThat(emitter.emit(11), is(equalTo(6))); + emitter.emit(11); + assertThat(emitter.bufferSize(), is(equalTo(6))); subscriber.requestMax() .assertNotTerminated(); diff --git a/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java b/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java index dd3eb477d0b..d5325842a0a 100644 --- a/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java +++ b/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java @@ -286,10 +286,12 @@ private static final class HttpResponsePublisher extends BufferedEmittingPublish }); } - public int emit(final ByteBuf buf) { + + + public void emit(final ByteBuf buf) { buf.retain(); - return super.emit(DataChunk.create(false, true, buf::release, - buf.nioBuffer().asReadOnlyBuffer())); + super.emit(DataChunk.create(false, true, buf::release, + buf.nioBuffer().asReadOnlyBuffer())); } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java b/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java index b100db92d4c..8383145e5ce 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java @@ -36,9 +36,9 @@ class HttpRequestScopedPublisher extends BufferedEmittingPublisher { this.holdingQueue = holdingQueue; } - public int emit(ByteBuf data) { + public void emit(ByteBuf data) { try { - return super.emit(new ByteBufRequestChunk(data, holdingQueue)); + super.emit(new ByteBufRequestChunk(data, holdingQueue)); } finally { holdingQueue.release(); }