Skip to content

Commit

Permalink
[Reactive] Fix Multi.flatMap losing items on the fastpath when condit…
Browse files Browse the repository at this point in the history
…ions are not met (#1669)

* [Reactive] Fix Multi.flatMap losing items on the fastpath when conditions are not met

* Remove unused import.

* Simplify InnerSubscriber.onSubscribe
  • Loading branch information
akarnokd authored Apr 22, 2020
1 parent 355f646 commit c369146
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ public void innerNext(R item, InnerSubscriber<R> sender) {
sender.produced(1L);
} else {
// yes, go on a full drain loop
sender.enqueue(item);
q.offer(sender);
drainLoop();
return;
}
Expand Down Expand Up @@ -435,21 +437,8 @@ static final class InnerSubscriber<R>

@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription, "subscription is null");
for (;;) {
Flow.Subscription current = get();
if (current == this) {
subscription.cancel();
return;
}
if (current != null) {
subscription.cancel();
throw new IllegalStateException("Subscription already set!");
}
if (compareAndSet(null, subscription)) {
subscription.request(prefetch);
return;
}
if (SubscriptionHelper.setOnce(this, subscription)) {
subscription.request(prefetch);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package io.helidon.common.reactive;

import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -282,4 +284,58 @@ public void flatMapCompletionStage() {
.request(1)
.assertResult(1, 2);
}

static final int UPSTREAM_ITEM_COUNT = 100;
static final int ASYNC_MULTIPLY = 10;
static final int ASYNC_DELAY_MILLIS = 20;
static final int EXPECTED_EMISSION_COUNT = 1000;
static final int MAX_CONCURRENCY = 128;
static final int PREFETCH = 128;
static final List<Integer> TEST_DATA = IntStream.rangeClosed(1, UPSTREAM_ITEM_COUNT)
.boxed()
.collect(Collectors.toList());

@Test
@Ignore // takes too long on its own, only for checking out possible bugs
public void multiLoop() throws Throwable {
for (int i = 0; i < 1000; i++) {
if (i % 10 == 0) {
System.out.println("multiLoop: " + i);
}
multi();
}
}

@Test
void multi() throws ExecutionException, InterruptedException {
assertEquals(EXPECTED_EMISSION_COUNT, Multi.from(TEST_DATA)
.flatMap(MultiFlatMapPublisherTest::asyncFlowPublisher, MAX_CONCURRENCY, false, PREFETCH)
.distinct()
.collectList()
.toStage()
.toCompletableFuture()
.get()
.size());
}

private static Flow.Publisher<? extends String> asyncFlowPublisher(Integer i) {
SubmissionPublisher<String> pub = new SubmissionPublisher<>();
new Thread(() -> {
for (int o = 0; o < ASYNC_MULTIPLY; o++) {
sleep(ASYNC_DELAY_MILLIS);
pub.submit(i + "#" + o);
}
pub.close();
}).start();
return pub;
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}

0 comments on commit c369146

Please sign in to comment.