Skip to content

Commit

Permalink
Merge pull request #23 from seime/threadpool
Browse files Browse the repository at this point in the history
Execute rules on a separate executor service to avoid congestion
  • Loading branch information
seaside1 authored Apr 5, 2022
2 parents 3aabd28 + 62cdeeb commit aa01aac
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,6 @@ public synchronized void dispose() {
initFuture = null;
}
jRuleHandler.dispose();
jRuleEngine.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import org.eclipse.jdt.annotation.NonNull;
Expand Down Expand Up @@ -93,6 +97,10 @@ public class JRuleEngine implements PropertyChangeListener {

private static volatile JRuleEngine instance;

private ThreadPoolExecutor ruleExecutorService;
private final static int MIN_EXECUTORS = 2; // Should be made configurable
private final static int MAX_EXECUTORS = 10;// Should be made configurable

private JRuleConfig config;

private final Map<String, List<JRule>> itemToRules = new HashMap<>();
Expand All @@ -108,6 +116,21 @@ public class JRuleEngine implements PropertyChangeListener {
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);

private JRuleEngine() {

ThreadFactory ruleExecutorThreadFactory = new ThreadFactory() {
private final AtomicLong threadIndex = new AtomicLong(0);

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("JRule-Executor-" + threadIndex.getAndIncrement());
return thread;
}
};

// Keep unused threads for 2 minutes before scaling back
ruleExecutorService = new ThreadPoolExecutor(MIN_EXECUTORS, MAX_EXECUTORS, 2L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), ruleExecutorThreadFactory);
}

public static JRuleEngine get() {
Expand Down Expand Up @@ -475,7 +498,11 @@ private String getItemNameFromEvent(Event event) {
return null;
}

private synchronized void invokeRule(JRuleExecutionContext context, JRuleEvent event) {
private void invokeRule(JRuleExecutionContext context, JRuleEvent event) {
ruleExecutorService.submit(() -> invokeRuleInSeparateThread(context, event));
}

private void invokeRuleInSeparateThread(JRuleExecutionContext context, JRuleEvent event) {
JRuleLog.debug(logger, context.getLogName(), "Invoking rule for context: {}", context);
final JRule rule = context.getJrule();
final Method method = context.getMethod();
Expand Down Expand Up @@ -503,4 +530,13 @@ private synchronized static String getStackTraceAsString(Throwable throwable) {
public void setConfig(@NonNull JRuleConfig config) {
this.config = config;
}

public void dispose() {
ruleExecutorService.shutdown();
try {
ruleExecutorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logWarn("Not all rules ran to completion before rule engine shutdown");
}
}
}

0 comments on commit aa01aac

Please sign in to comment.