Skip to content

Commit

Permalink
2.x: prevent tasks to self interrupt on the standard schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Mar 20, 2017
1 parent 21a7a05 commit 80dcb69
Show file tree
Hide file tree
Showing 19 changed files with 924 additions and 41 deletions.
31 changes: 25 additions & 6 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@

import java.util.concurrent.TimeUnit;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.schedulers.SchedulerWhen;
import io.reactivex.internal.schedulers.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -131,9 +130,11 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull Tim

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

w.schedule(new DisposeTask(decoratedRun, w), delay, unit);
DisposeTask task = new DisposeTask(decoratedRun, w);

return w;
w.schedule(task, delay, unit);

return task;
}

/**
Expand Down Expand Up @@ -432,22 +433,40 @@ public boolean isDisposed() {
}
}

static final class DisposeTask implements Runnable {
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;

Thread runner;

DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}

@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}

@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}

@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.Functions;

/**
* Base functionality for direct tasks that manage a runnable and cancellation/completion.
* @since 2.0.8
*/
abstract class AbstractDirectTask
extends AtomicReference<Future<?>>
implements Disposable {

private static final long serialVersionUID = 1811839108042568751L;

protected final Runnable runnable;

protected Thread runner;

protected static final FutureTask<Void> FINISHED = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);

protected static final FutureTask<Void> DISPOSED = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);

public AbstractDirectTask(Runnable runnable) {
this.runnable = runnable;
}

@Override
public final void dispose() {
Future<?> f = get();
if (f != FINISHED && f != DISPOSED) {
if (compareAndSet(f, DISPOSED)) {
if (f != null) {
f.cancel(runner != Thread.currentThread());
}
}
}
}

@Override
public final boolean isDisposed() {
Future<?> f = get();
return f == FINISHED || f == DISPOSED;
}

public final void setFuture(Future<?> future) {
for (;;) {
Future<?> f = get();
if (f == FINISHED) {
break;
}
if (f == DISPOSED) {
future.cancel(runner != Thread.currentThread());
break;
}
if (compareAndSet(f, future)) {
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
if (executor instanceof ExecutorService) {
Future<?> f = ((ExecutorService)executor).submit(decoratedRun);
return Disposables.fromFuture(f);
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
Future<?> f = ((ExecutorService)executor).submit(task);
task.setFuture(f);
return task;
}

BooleanRunnable br = new BooleanRunnable(decoratedRun);
Expand All @@ -70,8 +72,10 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (executor instanceof ScheduledExecutorService) {
try {
Future<?> f = ((ScheduledExecutorService)executor).schedule(decoratedRun, delay, unit);
return Disposables.fromFuture(f);
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
Future<?> f = ((ScheduledExecutorService)executor).schedule(task, delay, unit);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
Expand All @@ -93,8 +97,10 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
if (executor instanceof ScheduledExecutorService) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
return Disposables.fromFuture(f);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonN
* @return the ScheduledRunnable instance
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(decoratedRun);
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(decoratedRun, delayTime, unit);
f = executor.schedule(task, delayTime, unit);
}
return Disposables.fromFuture(f);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
Expand All @@ -85,10 +86,11 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
* @return the ScheduledRunnable instance
*/
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
return Disposables.fromFuture(f);
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
Expand Down Expand Up @@ -145,6 +147,16 @@ public void dispose() {
}
}

/**
* Shuts down the underlying executor in a non-interrupting fashion.
*/
public void shutdown() {
if (!disposed) {
disposed = true;
executor.shutdown();
}
}

@Override
public boolean isDisposed() {
return disposed;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.internal.schedulers;

import io.reactivex.plugins.RxJavaPlugins;

/**
* A Callable to be submitted to an ExecutorService that runs a Runnable
* action periodically and manages completion/cancellation.
* @since 2.0.8
*/
public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implements Runnable {

private static final long serialVersionUID = 1811839108042568751L;

public ScheduledDirectPeriodicTask(Runnable runnable) {
super(runnable);
}

@Override
public void run() {
runner = Thread.currentThread();
try {
try {
runnable.run();
} catch (Throwable ex) {
lazySet(FINISHED);
RxJavaPlugins.onError(ex);
}
} finally {
runner = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.internal.schedulers;

import java.util.concurrent.Callable;

/**
* A Callable to be submitted to an ExecutorService that runs a Runnable
* action and manages completion/cancellation.
* @since 2.0.8
*/
public final class ScheduledDirectTask extends AbstractDirectTask implements Callable<Void> {

private static final long serialVersionUID = 1811839108042568751L;

public ScheduledDirectTask(Runnable runnable) {
super(runnable);
}

@Override
public Void call() throws Exception {
runner = Thread.currentThread();
try {
runnable.run();
} finally {
lazySet(FINISHED);
runner = null;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>

static final int PARENT_INDEX = 0;
static final int FUTURE_INDEX = 1;
static final int THREAD_INDEX = 2;

/**
* Creates a ScheduledRunnable by wrapping the given action and setting
Expand All @@ -40,7 +41,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
* @param parent the parent tracking container or null if none
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(2);
super(3);
this.actual = actual;
this.lazySet(0, parent);
}
Expand All @@ -54,6 +55,7 @@ public Object call() {

@Override
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
actual.run();
Expand All @@ -62,6 +64,7 @@ public void run() {
RxJavaPlugins.onError(e);
}
} finally {
lazySet(THREAD_INDEX, null);
Object o = get(PARENT_INDEX);
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
((DisposableContainer)o).delete(this);
Expand All @@ -83,7 +86,7 @@ public void setFuture(Future<?> f) {
return;
}
if (o == DISPOSED) {
f.cancel(true);
f.cancel(get(THREAD_INDEX) != Thread.currentThread());
return;
}
if (compareAndSet(FUTURE_INDEX, o, f)) {
Expand All @@ -101,7 +104,7 @@ public void dispose() {
}
if (compareAndSet(FUTURE_INDEX, o, DISPOSED)) {
if (o != null) {
((Future<?>)o).cancel(true);
((Future<?>)o).cancel(get(THREAD_INDEX) != Thread.currentThread());
}
break;
}
Expand Down
Loading

0 comments on commit 80dcb69

Please sign in to comment.