Skip to content

Commit

Permalink
2.x: fix flatMap emitting the terminal exception indicator on cancel (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 16, 2017
1 parent ce1f2d0 commit db75e89
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,12 @@ void drainLoop() {

if (d && (svq == null || svq.isEmpty()) && n == 0) {
Throwable ex = errs.terminate();
if (ex == null) {
child.onComplete();
} else {
child.onError(ex);
if (ex != ExceptionHelper.TERMINATED) {
if (ex == null) {
child.onComplete();
} else {
child.onError(ex);
}
}
return;
}
Expand Down Expand Up @@ -556,7 +558,10 @@ boolean checkTerminate() {
}
if (!delayErrors && errs.get() != null) {
clearScalarQueue();
actual.onError(errs.terminate());
Throwable ex = errs.terminate();
if (ex != ExceptionHelper.TERMINATED) {
actual.onError(ex);
}
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,13 @@ void drainLoop() {
int n = inner.length;

if (d && (svq == null || svq.isEmpty()) && n == 0) {
Throwable ex = errors.get();
if (ex == null) {
child.onComplete();
} else {
child.onError(errors.terminate());
Throwable ex = errors.terminate();
if (ex != ExceptionHelper.TERMINATED) {
if (ex == null) {
child.onComplete();
} else {
child.onError(ex);
}
}
return;
}
Expand Down Expand Up @@ -488,7 +490,10 @@ boolean checkTerminate() {
Throwable e = errors.get();
if (!delayErrors && (e != null)) {
disposeAll();
actual.onError(errors.terminate());
e = errors.terminate();
if (e != ExceptionHelper.TERMINATED) {
actual.onError(e);
}
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
Expand Down Expand Up @@ -926,4 +927,74 @@ public Object apply(Integer v) throws Exception {
assertTrue(list.toString(), list.contains("RxCo"));
}
}

@Test
public void cancelScalarDrainRace() {
for (int i = 0; i < 1000; i++) {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {

final PublishProcessor<Flowable<Integer>> pp = PublishProcessor.create();

final TestSubscriber<Integer> ts = pp.flatMap(Functions.<Flowable<Integer>>identity()).test(0);

Runnable r1 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
pp.onComplete();
}
};

TestHelper.race(r1, r2);

assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
}

@Test
public void cancelDrainRace() {
for (int i = 0; i < 1000; i++) {
for (int j = 1; j < 50; j += 5) {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {

final PublishProcessor<Flowable<Integer>> pp = PublishProcessor.create();

final TestSubscriber<Integer> ts = pp.flatMap(Functions.<Flowable<Integer>>identity()).test(0);

final PublishProcessor<Integer> just = PublishProcessor.create();
pp.onNext(just);

Runnable r1 = new Runnable() {
@Override
public void run() {
ts.request(1);
ts.cancel();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
just.onNext(1);
}
};

TestHelper.race(r1, r2);

assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

Expand Down Expand Up @@ -784,4 +786,76 @@ public Object apply(Integer v) throws Exception {
assertTrue(list.toString(), list.contains("RxCo"));
}
}

@Test
public void cancelScalarDrainRace() {
for (int i = 0; i < 1000; i++) {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {

final PublishSubject<Observable<Integer>> pp = PublishSubject.create();

final TestObserver<Integer> ts = pp.flatMap(Functions.<Observable<Integer>>identity()).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
pp.onComplete();
}
};

TestHelper.race(r1, r2);

assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
}

@Test
public void cancelDrainRace() {
for (int i = 0; i < 1000; i++) {
for (int j = 1; j < 50; j += 5) {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {

final PublishSubject<Observable<Integer>> pp = PublishSubject.create();

final TestObserver<Integer> ts = pp.flatMap(Functions.<Observable<Integer>>identity()).test();

final PublishSubject<Integer> just = PublishSubject.create();
final PublishSubject<Integer> just2 = PublishSubject.create();
pp.onNext(just);
pp.onNext(just2);

Runnable r1 = new Runnable() {
@Override
public void run() {
just2.onNext(1);
ts.cancel();
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
just.onNext(1);
}
};

TestHelper.race(r1, r2);

assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
}
}
}

0 comments on commit db75e89

Please sign in to comment.