Skip to content

Commit

Permalink
2.x: Fix switchMap to indicate boundary fusion (#5991)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored May 4, 2018
1 parent 214181f commit 90203b6
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public void onSubscribe(Subscription s) {
@SuppressWarnings("unchecked")
QueueSubscription<R> qs = (QueueSubscription<R>) s;

int m = qs.requestFusion(QueueSubscription.ANY);
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
if (m == QueueSubscription.SYNC) {
fusionMode = m;
queue = qs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void onSubscribe(Disposable s) {
@SuppressWarnings("unchecked")
QueueDisposable<R> qd = (QueueDisposable<R>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
queue = qd;
done = true;
Expand Down
210 changes: 210 additions & 0 deletions src/test/java/io/reactivex/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2922,4 +2922,214 @@ public void request(long n) {
}
};
}

static final class FlowableStripBoundary<T> extends Flowable<T> implements FlowableTransformer<T, T> {

final Flowable<T> source;

FlowableStripBoundary(Flowable<T> source) {
this.source = source;
}

@Override
public Flowable<T> apply(Flowable<T> upstream) {
return new FlowableStripBoundary<T>(upstream);
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new StripBoundarySubscriber<T>(s));
}

static final class StripBoundarySubscriber<T> implements FlowableSubscriber<T>, QueueSubscription<T> {

final Subscriber<? super T> actual;

Subscription upstream;

QueueSubscription<T> qs;

StripBoundarySubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}

@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Subscription subscription) {
this.upstream = subscription;
if (subscription instanceof QueueSubscription) {
qs = (QueueSubscription<T>)subscription;
}
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public int requestFusion(int mode) {
QueueSubscription<T> fs = qs;
if (fs != null) {
return fs.requestFusion(mode & ~BOUNDARY);
}
return NONE;
}

@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public T poll() throws Exception {
return qs.poll();
}

@Override
public void clear() {
qs.clear();
}

@Override
public boolean isEmpty() {
return qs.isEmpty();
}

@Override
public void request(long n) {
upstream.request(n);
}

@Override
public void cancel() {
upstream.cancel();
}
}
}

public static <T> FlowableTransformer<T, T> flowableStripBoundary() {
return new FlowableStripBoundary<T>(null);
}

static final class ObservableStripBoundary<T> extends Observable<T> implements ObservableTransformer<T, T> {

final Observable<T> source;

ObservableStripBoundary(Observable<T> source) {
this.source = source;
}

@Override
public Observable<T> apply(Observable<T> upstream) {
return new ObservableStripBoundary<T>(upstream);
}

@Override
protected void subscribeActual(Observer<? super T> s) {
source.subscribe(new StripBoundaryObserver<T>(s));
}

static final class StripBoundaryObserver<T> implements Observer<T>, QueueDisposable<T> {

final Observer<? super T> actual;

Disposable upstream;

QueueDisposable<T> qd;

StripBoundaryObserver(Observer<? super T> actual) {
this.actual = actual;
}

@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Disposable d) {
this.upstream = d;
if (d instanceof QueueDisposable) {
qd = (QueueDisposable<T>)d;
}
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public int requestFusion(int mode) {
QueueDisposable<T> fs = qd;
if (fs != null) {
return fs.requestFusion(mode & ~BOUNDARY);
}
return NONE;
}

@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public T poll() throws Exception {
return qd.poll();
}

@Override
public void clear() {
qd.clear();
}

@Override
public boolean isEmpty() {
return qd.isEmpty();
}

@Override
public void dispose() {
upstream.dispose();
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}

public static <T> ObservableTransformer<T, T> observableStripBoundary() {
return new ObservableStripBoundary<T>(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.schedulers.*;
import io.reactivex.subscribers.*;

public class FlowableSwitchTest {
Expand Down Expand Up @@ -1144,12 +1144,16 @@ public void run() {
@Test
public void fusedInnerCrash() {
Flowable.just(1).hide()
.switchMap(Functions.justFunction(Flowable.just(1).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})))
.switchMap(Functions.justFunction(Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Object>flowableStripBoundary())
)
)
.test()
.assertFailure(TestException.class);
}
Expand All @@ -1174,4 +1178,30 @@ public void innerCancelledOnMainError() {

ts.assertFailure(TestException.class);
}

@Test
public void fusedBoundary() {
String thread = Thread.currentThread().getName();

Flowable.range(1, 10000)
.switchMap(new Function<Integer, Flowable<? extends Object>>() {
@Override
public Flowable<? extends Object> apply(Integer v)
throws Exception {
return Flowable.just(2).hide()
.observeOn(Schedulers.single())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
return Thread.currentThread().getName();
}
});
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNever(thread)
.assertNoErrors()
.assertComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.List;
Expand All @@ -26,15 +27,14 @@
import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.schedulers.*;
import io.reactivex.subjects.*;

public class ObservableSwitchTest {

Expand Down Expand Up @@ -1121,6 +1121,7 @@ public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>observableStripBoundary())
))
.test();

Expand Down Expand Up @@ -1148,6 +1149,7 @@ public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>observableStripBoundary())
))
.test();

Expand All @@ -1166,4 +1168,30 @@ public Integer apply(Integer v) throws Exception {

assertFalse(ps.hasObservers());
}

@Test
public void fusedBoundary() {
String thread = Thread.currentThread().getName();

Observable.range(1, 10000)
.switchMap(new Function<Integer, ObservableSource<? extends Object>>() {
@Override
public ObservableSource<? extends Object> apply(Integer v)
throws Exception {
return Observable.just(2).hide()
.observeOn(Schedulers.single())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
return Thread.currentThread().getName();
}
});
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNever(thread)
.assertNoErrors()
.assertComplete();
}
}

0 comments on commit 90203b6

Please sign in to comment.