From 2a3dc3098c16c09c86c773681aa9cf98839f3842 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 7 Nov 2023 09:37:47 +0100 Subject: [PATCH] improve memory footprint of the timer --- .../faulttolerance/core/timeout/Timeout.java | 2 +- .../core/timeout/TimeoutExecution.java | 26 ++-- .../core/timer/ThreadTimer.java | 131 ++++++++++++------ .../core/timer/TimerLogger.java | 10 +- .../RealWorldCompletionStageTimeoutTest.java | 2 +- .../core/timeout/TimerTimeoutWatcherTest.java | 2 +- .../core/timer/ThreadTimerStressTest.java | 2 +- .../core/timer/ThreadTimerTest.java | 2 +- .../faulttolerance/ExecutorHolder.java | 2 +- .../standalone/LazyDependencies.java | 2 +- 10 files changed, 116 insertions(+), 65 deletions(-) diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java index 91afbf81..9c8f204a 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java @@ -34,7 +34,7 @@ public V apply(InvocationContext ctx) throws Exception { } private V doApply(InvocationContext ctx) throws Exception { - // must extract `AsyncTimeoutNotification` synchronously, because if retries are present, + // must extract `AsyncTimeoutNotification` early, because if retries are present, // a different `AsyncTimeoutNotification` may be present in the `InvocationContext` // by the time the timeout callback is invoked AsyncTimeoutNotification notification = ctx.get(AsyncTimeoutNotification.class); diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java index ae0d4214..d01887dd 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java @@ -1,13 +1,23 @@ package io.smallrye.faulttolerance.core.timeout; -import java.util.concurrent.atomic.AtomicInteger; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; final class TimeoutExecution { private static final int STATE_RUNNING = 0; private static final int STATE_FINISHED = 1; private static final int STATE_TIMED_OUT = 2; - private final AtomicInteger state; + private static final VarHandle STATE; + static { + try { + STATE = MethodHandles.lookup().findVarHandle(TimeoutExecution.class, "state", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + private volatile int state; // can be null, if no thread shall be interrupted upon timeout private final Thread executingThread; @@ -21,7 +31,7 @@ final class TimeoutExecution { } TimeoutExecution(Thread executingThread, long timeoutInMillis, Runnable timeoutAction) { - this.state = new AtomicInteger(STATE_RUNNING); + this.state = STATE_RUNNING; this.executingThread = executingThread; this.timeoutInMillis = timeoutInMillis; this.timeoutAction = timeoutAction; @@ -32,25 +42,25 @@ long timeoutInMillis() { } boolean isRunning() { - return state.get() == STATE_RUNNING; + return state == STATE_RUNNING; } boolean hasFinished() { - return state.get() == STATE_FINISHED; + return state == STATE_FINISHED; } boolean hasTimedOut() { - return state.get() == STATE_TIMED_OUT; + return state == STATE_TIMED_OUT; } void finish(Runnable ifFinished) { - if (state.compareAndSet(STATE_RUNNING, STATE_FINISHED)) { + if (STATE.compareAndSet(this, STATE_RUNNING, STATE_FINISHED)) { ifFinished.run(); } } void timeoutAndInterrupt() { - if (state.compareAndSet(STATE_RUNNING, STATE_TIMED_OUT)) { + if (STATE.compareAndSet(this, STATE_RUNNING, STATE_TIMED_OUT)) { if (executingThread != null) { executingThread.interrupt(); } diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java index 06018bc9..2dc0aba4 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java @@ -10,9 +10,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; -import java.util.function.Consumer; import io.smallrye.faulttolerance.core.util.RunnableWrapper; @@ -20,41 +18,52 @@ * Starts one thread that processes submitted tasks in a loop and when it's time for a task to run, * it gets submitted to the executor. The default executor is provided by a caller, so the caller * must shut down this timer before shutting down the executor. + *

+ * At most one timer may exist. */ // TODO implement a hashed wheel? public final class ThreadTimer implements Timer { - private static final AtomicInteger COUNTER = new AtomicInteger(0); - private static final Comparator TASK_COMPARATOR = (o1, o2) -> { if (o1 == o2) { // two different instances are never equal return 0; } - // must _not_ return 0 if start times are equal, because that isn't consistent - // with `equals` (see also above) - return o1.startTime <= o2.startTime ? -1 : 1; + // must _not_ return 0 if start times are equal, because that isn't consistent with `equals` (see also above) + // must _not_ compare using `<` etc. because of how `System.nanoTime()` works (see also below) + return o1.startTime - o2.startTime < 0 ? -1 : 1; }; - private final String name; + private static volatile ThreadTimer INSTANCE; + + private final SortedSet tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR); - private final SortedSet tasks; + private final Executor defaultExecutor; private final Thread thread; private final AtomicBoolean running = new AtomicBoolean(true); /** + * Creates a timer with given {@code defaultExecutor}, unless a timer already exists, + * in which case an exception is thrown. + * * @param defaultExecutor default {@link Executor} used for running scheduled tasks, unless an executor * is provided when {@linkplain #schedule(long, Runnable, Executor) scheduling} a task */ - public ThreadTimer(Executor defaultExecutor) { - checkNotNull(defaultExecutor, "Executor must be set"); + public static synchronized ThreadTimer create(Executor defaultExecutor) { + ThreadTimer instance = INSTANCE; + if (instance == null) { + instance = new ThreadTimer(defaultExecutor); + INSTANCE = instance; + return instance; + } + throw new IllegalStateException("Timer already exists"); + } - this.name = "SmallRye Fault Tolerance Timer " + COUNTER.incrementAndGet(); - LOG.createdTimer(name); + private ThreadTimer(Executor defaultExecutor) { + this.defaultExecutor = checkNotNull(defaultExecutor, "Executor must be set"); - this.tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR); this.thread = new Thread(() -> { while (running.get()) { try { @@ -77,9 +86,9 @@ public ThreadTimer(Executor defaultExecutor) { // in such case, `taskStartTime` can be positive, `currentTime` can be negative, // and yet `taskStartTime` is _before_ `currentTime` if (taskStartTime - currentTime <= 0) { - tasks.remove(task); - if (task.state.compareAndSet(Task.STATE_NEW, Task.STATE_RUNNING)) { - Executor executorForTask = task.executorOverride; + boolean removed = tasks.remove(task); + if (removed) { + Executor executorForTask = task.executor(); if (executorForTask == null) { executorForTask = defaultExecutor; } @@ -89,23 +98,25 @@ public ThreadTimer(Executor defaultExecutor) { try { task.runnable.run(); } finally { - task.state.set(Task.STATE_FINISHED); + task.runnable = null; } }); } } else { // this is OK even if another timer is scheduled during the sleep (even if that timer should - // fire sooner than `taskStartTime`), because `schedule` always calls` LockSupport.unpark` + // fire sooner than `taskStartTime`), because `schedule` always calls `LockSupport.unpark` LockSupport.parkNanos(taskStartTime - currentTime); } } - } catch (Exception e) { + } catch (Throwable e) { // can happen e.g. when the executor is shut down sooner than the timer LOG.unexpectedExceptionInTimerLoop(e); } } - }, name); + }, "SmallRye Fault Tolerance Timer"); thread.start(); + + LOG.createdTimer(); } @Override @@ -116,7 +127,10 @@ public TimerTask schedule(long delayInMillis, Runnable task) { @Override public TimerTask schedule(long delayInMillis, Runnable task, Executor executor) { long startTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(delayInMillis); - Task timerTask = new Task(startTime, RunnableWrapper.INSTANCE.wrap(task), tasks::remove, executor); + task = RunnableWrapper.INSTANCE.wrap(task); + Task timerTask = executor == null || executor == defaultExecutor + ? new Task(startTime, task) + : new TaskWithExecutor(startTime, task, executor); tasks.add(timerTask); LockSupport.unpark(thread); LOG.scheduledTimerTask(timerTask, delayInMillis); @@ -131,47 +145,74 @@ public int countScheduledTasks() { @Override public void shutdown() throws InterruptedException { if (running.compareAndSet(true, false)) { - LOG.shutdownTimer(name); - thread.interrupt(); - thread.join(); + try { + LOG.shutdownTimer(); + thread.interrupt(); + thread.join(); + } finally { + INSTANCE = null; + } } } - private static final class Task implements TimerTask { - static final int STATE_NEW = 0; // was scheduled, but isn't running yet - static final int STATE_RUNNING = 1; // running on the executor - static final int STATE_FINISHED = 2; // finished running - static final int STATE_CANCELLED = 3; // cancelled before it could be executed + private static class Task implements TimerTask { + // scheduled: present in the `tasks` queue + // running: not present in the `tasks` queue && `runnable != null` + // finished or cancelled: not present in the `tasks` queue && `runnable == null` final long startTime; // in nanos, to be compared with System.nanoTime() - final Runnable runnable; - final Executor executorOverride; // may be null, which means that the timer's executor shall be used - final AtomicInteger state = new AtomicInteger(STATE_NEW); - - private final Consumer onCancel; + volatile Runnable runnable; - Task(long startTime, Runnable runnable, Consumer onCancel, Executor executorOverride) { + Task(long startTime, Runnable runnable) { this.startTime = startTime; this.runnable = checkNotNull(runnable, "Runnable task must be set"); - this.onCancel = checkNotNull(onCancel, "Cancellation callback must be set"); - this.executorOverride = executorOverride; } @Override public boolean isDone() { - int state = this.state.get(); - return state == STATE_FINISHED || state == STATE_CANCELLED; + ThreadTimer timer = INSTANCE; + if (timer != null) { + boolean queued = timer.tasks.contains(this); + if (queued) { + return false; + } else { + return runnable == null; + } + } + return true; // ? } @Override public boolean cancel() { - // can't cancel if it's already running - if (state.compareAndSet(STATE_NEW, STATE_CANCELLED)) { - LOG.cancelledTimerTask(this); - onCancel.accept(this); - return true; + ThreadTimer timer = INSTANCE; + if (timer != null) { + // can't cancel if it's already running + boolean removed = timer.tasks.remove(this); + if (removed) { + runnable = null; + LOG.cancelledTimerTask(this); + return true; + } } return false; } + + public Executor executor() { + return null; // default executor of the timer should be used + } + } + + private static final class TaskWithExecutor extends Task { + private final Executor executor; + + TaskWithExecutor(long startTime, Runnable runnable, Executor executor) { + super(startTime, runnable); + this.executor = checkNotNull(executor, "Executor must be set"); + } + + @Override + public Executor executor() { + return executor; + } } } diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java index 235b03fd..b75d756e 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java @@ -14,13 +14,13 @@ interface TimerLogger extends BasicLogger { TimerLogger LOG = Logger.getMessageLogger(TimerLogger.class, TimerLogger.class.getPackage().getName()); - @Message(id = NONE, value = "Timer %s created") + @Message(id = NONE, value = "Timer created") @LogMessage(level = Logger.Level.TRACE) - void createdTimer(String name); + void createdTimer(); - @Message(id = NONE, value = "Timer %s shut down") + @Message(id = NONE, value = "Timer shut down") @LogMessage(level = Logger.Level.TRACE) - void shutdownTimer(String name); + void shutdownTimer(); @Message(id = NONE, value = "Scheduled timer task %s to run in %s millis") @LogMessage(level = Logger.Level.TRACE) @@ -36,5 +36,5 @@ interface TimerLogger extends BasicLogger { @Message(id = 11000, value = "Unexpected exception in timer loop, ignoring") @LogMessage(level = Logger.Level.WARN) - void unexpectedExceptionInTimerLoop(@Cause Exception e); + void unexpectedExceptionInTimerLoop(@Cause Throwable e); } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java index ad3845ac..a82ebfce 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java @@ -55,7 +55,7 @@ public void setUp() { executor = Executors.newSingleThreadExecutor(); timerExecutor = Executors.newSingleThreadExecutor(); - timer = new ThreadTimer(timerExecutor); + timer = ThreadTimer.create(timerExecutor); timerWatcher = new TimerTimeoutWatcher(timer); } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java index 6305253a..e3ce8118 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java @@ -27,7 +27,7 @@ public class TimerTimeoutWatcherTest { @BeforeEach public void setUp() { executor = Executors.newSingleThreadExecutor(); - timer = new ThreadTimer(executor); + timer = ThreadTimer.create(executor); watcher = new TimerTimeoutWatcher(timer); } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java index 9ac185bc..8e362c90 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java @@ -33,7 +33,7 @@ public class ThreadTimerStressTest { @BeforeEach public void setUp() throws InterruptedException { executor = Executors.newFixedThreadPool(POOL_SIZE); - timer = new ThreadTimer(executor); + timer = ThreadTimer.create(executor); // precreate all threads in the pool // if we didn't do this, the first few iterations would be dominated diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java index a58bfb54..7b9c1fd3 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java @@ -22,7 +22,7 @@ public class ThreadTimerTest { @BeforeEach public void setUp() { executor = Executors.newSingleThreadExecutor(); - timer = new ThreadTimer(executor); + timer = ThreadTimer.create(executor); } @AfterEach diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java index 97df5784..c9ce6b0c 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java @@ -25,7 +25,7 @@ public class ExecutorHolder { public ExecutorHolder(AsyncExecutorProvider asyncExecutorProvider) { this.asyncExecutor = asyncExecutorProvider.get(); this.eventLoop = EventLoop.get(); - this.timer = new ThreadTimer(asyncExecutor); + this.timer = ThreadTimer.create(asyncExecutor); this.shouldShutdownAsyncExecutor = asyncExecutorProvider instanceof DefaultAsyncExecutorProvider; } diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java index f6a74d93..0b1f6a09 100644 --- a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java @@ -17,7 +17,7 @@ final class LazyDependencies implements BuilderLazyDependencies { this.enabled = config.enabled(); this.executor = config.executor(); this.eventLoop = EventLoop.get(); - this.timer = new ThreadTimer(executor); + this.timer = ThreadTimer.create(executor); } @Override