Skip to content

Commit

Permalink
improve memory footprint of the timer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ladicek committed Nov 8, 2023
1 parent 2f2c560 commit b4e59f4
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public V apply(InvocationContext<V> ctx) throws Exception {
}

private V doApply(InvocationContext<V> ctx) throws Exception {
// must extract `AsyncTimeoutNotification` synchronously, because if retries are present,
// must extract `AsyncTimeoutNotification` early, because if retries are present,
// a different `AsyncTimeoutNotification` may be present in the `InvocationContext`
// by the time the timeout callback is invoked
AsyncTimeoutNotification notification = ctx.get(AsyncTimeoutNotification.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package io.smallrye.faulttolerance.core.timeout;

import java.util.concurrent.atomic.AtomicInteger;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

final class TimeoutExecution {
private static final int STATE_RUNNING = 0;
private static final int STATE_FINISHED = 1;
private static final int STATE_TIMED_OUT = 2;

private final AtomicInteger state;
private static final VarHandle STATE;
static {
try {
STATE = MethodHandles.lookup().findVarHandle(TimeoutExecution.class, "state", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

private volatile int state;

// can be null, if no thread shall be interrupted upon timeout
private final Thread executingThread;
Expand All @@ -21,7 +31,7 @@ final class TimeoutExecution {
}

TimeoutExecution(Thread executingThread, long timeoutInMillis, Runnable timeoutAction) {
this.state = new AtomicInteger(STATE_RUNNING);
this.state = STATE_RUNNING;
this.executingThread = executingThread;
this.timeoutInMillis = timeoutInMillis;
this.timeoutAction = timeoutAction;
Expand All @@ -32,25 +42,25 @@ long timeoutInMillis() {
}

boolean isRunning() {
return state.get() == STATE_RUNNING;
return state == STATE_RUNNING;
}

boolean hasFinished() {
return state.get() == STATE_FINISHED;
return state == STATE_FINISHED;
}

boolean hasTimedOut() {
return state.get() == STATE_TIMED_OUT;
return state == STATE_TIMED_OUT;
}

void finish(Runnable ifFinished) {
if (state.compareAndSet(STATE_RUNNING, STATE_FINISHED)) {
if (STATE.compareAndSet(this, STATE_RUNNING, STATE_FINISHED)) {
ifFinished.run();
}
}

void timeoutAndInterrupt() {
if (state.compareAndSet(STATE_RUNNING, STATE_TIMED_OUT)) {
if (STATE.compareAndSet(this, STATE_RUNNING, STATE_TIMED_OUT)) {
if (executingThread != null) {
executingThread.interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,66 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;

import io.smallrye.faulttolerance.core.util.RunnableWrapper;

/**
* Starts one thread that processes submitted tasks in a loop and when it's time for a task to run,
* it gets submitted to the executor. The default executor is provided by a caller, so the caller
* must shut down this timer <em>before</em> shutting down the executor.
* <p>
* At most one timer may exist.
*/
// TODO implement a hashed wheel?
public final class ThreadTimer implements Timer {
private static final AtomicInteger COUNTER = new AtomicInteger(0);

private static final Comparator<Task> TASK_COMPARATOR = (o1, o2) -> {
// two different instances are never equal
if (o1 == o2) {
// two different instances are never equal
return 0;
}

// must _not_ return 0 if start times are equal, because that isn't consistent
// with `equals` (see also above)
return o1.startTime <= o2.startTime ? -1 : 1;
// must _not_ return 0 if start times are equal, because that isn't consistent with `equals` (see also above)
// must _not_ compare `startTime` using `<` because of how `System.nanoTime()` works (see also below)
long delta = o1.startTime - o2.startTime;
if (delta < 0) {
return -1;
} else if (delta > 0) {
return 1;
}

return System.identityHashCode(o1) < System.identityHashCode(o2) ? -1 : 1;
};

private final String name;
private static volatile ThreadTimer INSTANCE;

private final SortedSet<Task> tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR);

private final SortedSet<Task> tasks;
private final Executor defaultExecutor;

private final Thread thread;

private final AtomicBoolean running = new AtomicBoolean(true);

/**
* Creates a timer with given {@code defaultExecutor}, unless a timer already exists,
* in which case an exception is thrown.
*
* @param defaultExecutor default {@link Executor} used for running scheduled tasks, unless an executor
* is provided when {@linkplain #schedule(long, Runnable, Executor) scheduling} a task
*/
public ThreadTimer(Executor defaultExecutor) {
checkNotNull(defaultExecutor, "Executor must be set");
public static synchronized ThreadTimer create(Executor defaultExecutor) {
ThreadTimer instance = INSTANCE;
if (instance == null) {
instance = new ThreadTimer(defaultExecutor);
INSTANCE = instance;
return instance;
}
throw new IllegalStateException("Timer already exists");
}

this.name = "SmallRye Fault Tolerance Timer " + COUNTER.incrementAndGet();
LOG.createdTimer(name);
private ThreadTimer(Executor defaultExecutor) {
this.defaultExecutor = checkNotNull(defaultExecutor, "Executor must be set");

this.tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR);
this.thread = new Thread(() -> {
while (running.get()) {
try {
Expand All @@ -77,35 +92,30 @@ public ThreadTimer(Executor defaultExecutor) {
// in such case, `taskStartTime` can be positive, `currentTime` can be negative,
// and yet `taskStartTime` is _before_ `currentTime`
if (taskStartTime - currentTime <= 0) {
tasks.remove(task);
if (task.state.compareAndSet(Task.STATE_NEW, Task.STATE_RUNNING)) {
Executor executorForTask = task.executorOverride;
boolean removed = tasks.remove(task);
if (removed) {
Executor executorForTask = task.executor();
if (executorForTask == null) {
executorForTask = defaultExecutor;
}

executorForTask.execute(() -> {
LOG.runningTimerTask(task);
try {
task.runnable.run();
} finally {
task.state.set(Task.STATE_FINISHED);
}
});
executorForTask.execute(task);
}
} else {
// this is OK even if another timer is scheduled during the sleep (even if that timer should
// fire sooner than `taskStartTime`), because `schedule` always calls` LockSupport.unpark`
// fire sooner than `taskStartTime`), because `schedule` always calls `LockSupport.unpark`
LockSupport.parkNanos(taskStartTime - currentTime);
}
}
} catch (Exception e) {
} catch (Throwable e) {
// can happen e.g. when the executor is shut down sooner than the timer
LOG.unexpectedExceptionInTimerLoop(e);
}
}
}, name);
}, "SmallRye Fault Tolerance Timer");
thread.start();

LOG.createdTimer();
}

@Override
Expand All @@ -116,7 +126,10 @@ public TimerTask schedule(long delayInMillis, Runnable task) {
@Override
public TimerTask schedule(long delayInMillis, Runnable task, Executor executor) {
long startTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(delayInMillis);
Task timerTask = new Task(startTime, RunnableWrapper.INSTANCE.wrap(task), tasks::remove, executor);
task = RunnableWrapper.INSTANCE.wrap(task);
Task timerTask = executor == null || executor == defaultExecutor
? new Task(startTime, task)
: new TaskWithExecutor(startTime, task, executor);
tasks.add(timerTask);
LockSupport.unpark(thread);
LOG.scheduledTimerTask(timerTask, delayInMillis);
Expand All @@ -131,47 +144,84 @@ public int countScheduledTasks() {
@Override
public void shutdown() throws InterruptedException {
if (running.compareAndSet(true, false)) {
LOG.shutdownTimer(name);
thread.interrupt();
thread.join();
try {
LOG.shutdownTimer();
thread.interrupt();
thread.join();
} finally {
INSTANCE = null;
}
}
}

private static final class Task implements TimerTask {
static final int STATE_NEW = 0; // was scheduled, but isn't running yet
static final int STATE_RUNNING = 1; // running on the executor
static final int STATE_FINISHED = 2; // finished running
static final int STATE_CANCELLED = 3; // cancelled before it could be executed
private static class Task implements TimerTask, Runnable {
// scheduled: present in the `tasks` queue
// running: not present in the `tasks` queue && `runnable != null`
// finished or cancelled: not present in the `tasks` queue && `runnable == null`

final long startTime; // in nanos, to be compared with System.nanoTime()
final Runnable runnable;
final Executor executorOverride; // may be null, which means that the timer's executor shall be used
final AtomicInteger state = new AtomicInteger(STATE_NEW);

private final Consumer<TimerTask> onCancel;
volatile Runnable runnable;

Task(long startTime, Runnable runnable, Consumer<TimerTask> onCancel, Executor executorOverride) {
Task(long startTime, Runnable runnable) {
this.startTime = startTime;
this.runnable = checkNotNull(runnable, "Runnable task must be set");
this.onCancel = checkNotNull(onCancel, "Cancellation callback must be set");
this.executorOverride = executorOverride;
}

@Override
public boolean isDone() {
int state = this.state.get();
return state == STATE_FINISHED || state == STATE_CANCELLED;
ThreadTimer timer = INSTANCE;
if (timer != null) {
boolean queued = timer.tasks.contains(this);
if (queued) {
return false;
} else {
return runnable == null;
}
}
return true; // ?
}

@Override
public boolean cancel() {
// can't cancel if it's already running
if (state.compareAndSet(STATE_NEW, STATE_CANCELLED)) {
LOG.cancelledTimerTask(this);
onCancel.accept(this);
return true;
ThreadTimer timer = INSTANCE;
if (timer != null) {
// can't cancel if it's already running
boolean removed = timer.tasks.remove(this);
if (removed) {
runnable = null;
LOG.cancelledTimerTask(this);
return true;
}
}
return false;
}

public Executor executor() {
return null; // default executor of the timer should be used
}

@Override
public void run() {
LOG.runningTimerTask(this);
try {
runnable.run();
} finally {
runnable = null;
}
}
}

private static final class TaskWithExecutor extends Task {
private final Executor executor;

TaskWithExecutor(long startTime, Runnable runnable, Executor executor) {
super(startTime, runnable);
this.executor = checkNotNull(executor, "Executor must be set");
}

@Override
public Executor executor() {
return executor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
interface TimerLogger extends BasicLogger {
TimerLogger LOG = Logger.getMessageLogger(TimerLogger.class, TimerLogger.class.getPackage().getName());

@Message(id = NONE, value = "Timer %s created")
@Message(id = NONE, value = "Timer created")
@LogMessage(level = Logger.Level.TRACE)
void createdTimer(String name);
void createdTimer();

@Message(id = NONE, value = "Timer %s shut down")
@Message(id = NONE, value = "Timer shut down")
@LogMessage(level = Logger.Level.TRACE)
void shutdownTimer(String name);
void shutdownTimer();

@Message(id = NONE, value = "Scheduled timer task %s to run in %s millis")
@LogMessage(level = Logger.Level.TRACE)
Expand All @@ -36,5 +36,5 @@ interface TimerLogger extends BasicLogger {

@Message(id = 11000, value = "Unexpected exception in timer loop, ignoring")
@LogMessage(level = Logger.Level.WARN)
void unexpectedExceptionInTimerLoop(@Cause Exception e);
void unexpectedExceptionInTimerLoop(@Cause Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void setUp() {
executor = Executors.newSingleThreadExecutor();

timerExecutor = Executors.newSingleThreadExecutor();
timer = new ThreadTimer(timerExecutor);
timer = ThreadTimer.create(timerExecutor);
timerWatcher = new TimerTimeoutWatcher(timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class TimerTimeoutWatcherTest {
@BeforeEach
public void setUp() {
executor = Executors.newSingleThreadExecutor();
timer = new ThreadTimer(executor);
timer = ThreadTimer.create(executor);
watcher = new TimerTimeoutWatcher(timer);
}

Expand Down
Loading

0 comments on commit b4e59f4

Please sign in to comment.