Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[906] Lower memory footprint and other optimisations #907

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.smallrye.faulttolerance.core.apiimpl;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -16,7 +15,7 @@
import io.smallrye.faulttolerance.core.util.Callbacks;

public class BasicCircuitBreakerMaintenanceImpl implements CircuitBreakerMaintenance {
private final Set<String> knownNames = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<String> knownNames = ConcurrentHashMap.newKeySet();
private final ConcurrentMap<String, CircuitBreaker<?>> registry = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Collection<Consumer<CircuitBreakerState>>> stateChangeCallbacks = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import static io.smallrye.faulttolerance.core.timer.TimerLogger.LOG;
import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;

import io.smallrye.faulttolerance.core.util.RunnableWrapper;

Expand All @@ -24,6 +28,7 @@
// TODO implement a hashed wheel?
public final class ThreadTimer implements Timer {
private static final AtomicInteger COUNTER = new AtomicInteger(0);
private static final Set<ThreadTimer> TIMERS = ConcurrentHashMap.newKeySet();

private static final Comparator<Task> TASK_COMPARATOR = (o1, o2) -> {
if (o1 == o2) {
Expand All @@ -33,28 +38,31 @@ public final class ThreadTimer implements Timer {

// must _not_ return 0 if start times are equal, because that isn't consistent
// with `equals` (see also above)
return o1.startTime <= o2.startTime ? -1 : 1;
return o1.startTime < o2.startTime ? -1
: o1.startTime > o2.startTime ? 1
: o1.hashCode() <= o2.hashCode() ? -1 : 1;
};

private final String name;

private final SortedSet<Task> tasks;
final SortedSet<Task> tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR);

private final Thread thread;

private final AtomicBoolean running = new AtomicBoolean(true);

public final Executor defaultExecutor;

/**
* @param defaultExecutor default {@link Executor} used for running scheduled tasks, unless an executor
* is provided when {@linkplain #schedule(long, Runnable, Executor) scheduling} a task
*/
public ThreadTimer(Executor defaultExecutor) {
checkNotNull(defaultExecutor, "Executor must be set");
this.defaultExecutor = checkNotNull(defaultExecutor, "Executor must be set");

this.name = "SmallRye Fault Tolerance Timer " + COUNTER.incrementAndGet();
LOG.createdTimer(name);

this.tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR);
this.thread = new Thread(() -> {
while (running.get()) {
try {
Expand All @@ -78,34 +86,28 @@ public ThreadTimer(Executor defaultExecutor) {
// and yet `taskStartTime` is _before_ `currentTime`
if (taskStartTime - currentTime <= 0) {
tasks.remove(task);
if (task.state.compareAndSet(Task.STATE_NEW, Task.STATE_RUNNING)) {
Executor executorForTask = task.executorOverride;
if (STATE.compareAndSet(task, Task.STATE_NEW, Task.STATE_RUNNING)) {
Executor executorForTask = task.executorOverride();
if (executorForTask == null) {
executorForTask = defaultExecutor;
}

executorForTask.execute(() -> {
LOG.runningTimerTask(task);
try {
task.runnable.run();
} finally {
task.state.set(Task.STATE_FINISHED);
}
});
executorForTask.execute(task);
}
} else {
// this is OK even if another timer is scheduled during the sleep (even if that timer should
// fire sooner than `taskStartTime`), because `schedule` always calls` LockSupport.unpark`
LockSupport.parkNanos(taskStartTime - currentTime);
}
}
} catch (Exception e) {
} catch (Throwable e) {
// can happen e.g. when the executor is shut down sooner than the timer
LOG.unexpectedExceptionInTimerLoop(e);
}
}
}, name);
thread.start();
TIMERS.add(this);
}

@Override
Expand All @@ -116,7 +118,13 @@ public TimerTask schedule(long delayInMillis, Runnable task) {
@Override
public TimerTask schedule(long delayInMillis, Runnable task, Executor executor) {
long startTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(delayInMillis);
Task timerTask = new Task(startTime, RunnableWrapper.INSTANCE.wrap(task), tasks::remove, executor);
Task timerTask = executor == null || executor == defaultExecutor
? new Task(startTime, RunnableWrapper.INSTANCE.wrap(task))
: new Task(startTime, RunnableWrapper.INSTANCE.wrap(task)){
@Override Executor executorOverride (){
return executor;
}
};
tasks.add(timerTask);
LockSupport.unpark(thread);
LOG.scheduledTimerTask(timerTask, delayInMillis);
Expand All @@ -130,43 +138,88 @@ public void shutdown() throws InterruptedException {
thread.interrupt();
thread.join();
}
TIMERS.remove(this);
}

private static final class Task implements TimerTask {
static final int STATE_NEW = 0; // was scheduled, but isn't running yet
static final int STATE_RUNNING = 1; // running on the executor
static final int STATE_FINISHED = 2; // finished running
static final int STATE_CANCELLED = 3; // cancelled before it could be executed
private static class Task implements TimerTask, Runnable {
static final byte STATE_NEW = 0; // was scheduled, but isn't running yet
static final byte STATE_RUNNING = 1; // running on the executor
static final byte STATE_FINISHED = 2; // finished running
static final byte STATE_CANCELLED = 3; // cancelled before it could be executed

final long startTime; // in nanos, to be compared with System.nanoTime()
final Runnable runnable;
final Executor executorOverride; // may be null, which means that the timer's executor shall be used
final AtomicInteger state = new AtomicInteger(STATE_NEW);

private final Consumer<TimerTask> onCancel;
volatile byte state = STATE_NEW;

Task(long startTime, Runnable runnable, Consumer<TimerTask> onCancel, Executor executorOverride) {
Task(long startTime, Runnable runnable) {
this.startTime = startTime;
this.runnable = checkNotNull(runnable, "Runnable task must be set");
this.onCancel = checkNotNull(onCancel, "Cancellation callback must be set");
this.executorOverride = executorOverride;
}

Executor executorOverride() {
return null; // may be null, which means that the timer's executor shall be used
}

@Override
public boolean isDone() {
int state = this.state.get();
return state == STATE_FINISHED || state == STATE_CANCELLED;
byte s = this.state;
return s == STATE_FINISHED || s == STATE_CANCELLED;
}

@Override
public boolean cancel() {
if (runnable instanceof Future){ // wrapped FutureTask
try {
((Future<?>) runnable).cancel(false);
} catch (Throwable ignore){
// best effort; don't break our mechanics
}
}

// can't cancel if it's already running
if (state.compareAndSet(STATE_NEW, STATE_CANCELLED)) {
if (STATE.compareAndSet(this, STATE_NEW, STATE_CANCELLED)) {
LOG.cancelledTimerTask(this);
onCancel.accept(this);
for (ThreadTimer next : TIMERS){
next.tasks.remove(this);
}
return true;
}
return false;
}

@Override
public void run() {
LOG.runningTimerTask(this);
try {
runnable.run();
} finally {
STATE.setRelease(this, Task.STATE_FINISHED);
}
}

@Override
public String toString() {
return "TTask:"+state+':'+runnable+'@'+startTime;
}
}

public int size() {
return tasks.size();
}

/** For metrics and standalone-shutdown */
public static Set<ThreadTimer> getThreadTimerRegistry() {
return TIMERS;
}

// VarHandle mechanics
private static final VarHandle STATE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(Task.class, "state", byte.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ interface TimerLogger extends BasicLogger {

@Message(id = 11000, value = "Unexpected exception in timer loop, ignoring")
@LogMessage(level = Logger.Level.WARN)
void unexpectedExceptionInTimerLoop(@Cause Exception e);
void unexpectedExceptionInTimerLoop(@Cause Throwable e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -67,4 +68,54 @@ public void basicUsage() throws InterruptedException {

assertThat(queue).containsExactly("bar", "quux", "foo");
}

@Test
void simple() throws InterruptedException {
var tt = new ThreadTimer(Runnable::run);
assertThat(ThreadTimer.getThreadTimerRegistry()).contains(tt);
var q = new LinkedBlockingQueue<Integer>();
{
TimerTask task1 = tt.schedule(50, ()->q.add(1));
assertThat(tt.size()).isEqualTo(1);
assertThat(task1.isDone()).isFalse();
assertThat(task1.cancel()).isTrue();
assertThat(task1.cancel()).isFalse();// already canceled
assertThat(task1.isDone()).isTrue();
assertThat(q).hasSize(0);
Thread.sleep(60);
assertThat(q).hasSize(0);
assertThat(tt.size()).isEqualTo(0);
}
{
TimerTask task2 = tt.schedule(0, ()->q.add(2));
Thread.sleep(20);
assertThat(task2.isDone()).isTrue();
assertThat(task2.cancel()).isFalse();// already done
assertThat(task2.isDone()).isTrue();
assertThat(q).hasSize(1).containsExactly(2);
assertThat(tt.size()).isEqualTo(0);
}
tt.shutdown();
assertThat(ThreadTimer.getThreadTimerRegistry()).doesNotContain(tt);
}

@Test
void big() throws InterruptedException{
var tt = new ThreadTimer(Runnable::run);

var rt = Runtime.getRuntime();
long mem0 = rt.totalMemory() - rt.freeMemory();
System.out.println("Initial memory: "+mem0);
long t = System.nanoTime();
for (int i=0; i<10_000_000; i++){
tt.schedule(999_000, ()->{});
}
t = System.nanoTime() - t;
assertThat(tt.size()).isEqualTo(10_000_000);
long mem = rt.totalMemory() - rt.freeMemory();
System.out.println("Used memory: "+(mem-mem0));
System.out.println("Time: "+(t/1000/1000.0));

tt.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -35,7 +34,7 @@ private interface Invocation {
private static void test(int parallelRequests, Map<String, Integer> expectedResponses, Invocation invocation)
throws InterruptedException {

Set<String> violations = Collections.newSetFromMap(new ConcurrentHashMap<>());
Set<String> violations = ConcurrentHashMap.newKeySet();
Queue<String> seenResponses = new ConcurrentLinkedQueue<>();

ExecutorService executor = Executors.newFixedThreadPool(parallelRequests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.awaitility.Awaitility.await;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -63,7 +62,7 @@ private interface Invocation {

private static void test(int parallelRequests, Map<String, Range> expectedResponses, Invocation invocation)
throws InterruptedException {
Set<String> violations = Collections.newSetFromMap(new ConcurrentHashMap<>());
Set<String> violations = ConcurrentHashMap.newKeySet();
Queue<String> seenResponses = new ConcurrentLinkedQueue<>();

ExecutorService executor = Executors.newFixedThreadPool(parallelRequests);
Expand Down