Skip to content

Commit

Permalink
Merge pull request #1495 from smallrye/fix/1494
Browse files Browse the repository at this point in the history
fix: race condition on cancellation in UniCallbackSubscriber
  • Loading branch information
jponge authored Jan 23, 2024
2 parents e2bb9d9 + be54f15 commit d0e0f8d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final void onItem(T x) {
@Override
public void cancel() {
UniSubscription sub = SUBSCRIPTION_UPDATER.getAndSet(this, CANCELLED);
if (sub != CANCELLED) {
if (sub != null && sub != CANCELLED) {
sub.cancel();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -15,7 +18,10 @@
import org.junit.jupiter.api.parallel.ResourceLock;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.subscription.Cancellable;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
import junit5.support.InfrastructureResource;

@ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ)
Expand Down Expand Up @@ -105,4 +111,30 @@ public void testTwoCallbacksVariant() {
cancellable.cancel();
}

@Test
public void cancellationBeforeSubscriptionArrivesDoesntThrowNpe() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
try {
AbstractUni<String> uni = new AbstractUni<>() {
@Override
public void subscribe(UniSubscriber<? super String> subscriber) {
scheduler.schedule(() -> {
subscriber.onSubscribe(new UniSubscription() {
@Override
public void cancel() {

}
});
subscriber.onItem("Yolo");
}, 10, TimeUnit.SECONDS);
}
};

AtomicReference<String> result = new AtomicReference<>();
uni.subscribe().with(result::set).cancel();
assertThat(result).hasNullValue();
} finally {
scheduler.shutdown();
}
}
}

0 comments on commit d0e0f8d

Please sign in to comment.