diff --git a/Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java b/Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java deleted file mode 100644 index 19305619d32..00000000000 --- a/Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.util; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.base.log.LogOutputAppendable; -import io.deephaven.io.logger.Logger; - -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Creates a ThreadPoolExecutor which can then be used to submit or execute tasks. This is intended for cases where a - * relatively small number of threads can handle most circumstances, but occasional abnormal events may exceed - * expectations. The executor has the following characteristics: - * - * If the executor has been shut down, any excess events will be discarded. - * - * To create one of these executors, use {@link ExpandingThreadPoolExecutorFactory#createThreadPoolExecutor}. - */ -public class ExpandingThreadPoolExecutorFactory { - - // Stop anything from creating one of these - only use the createThreadPoolExecutor method - private ExpandingThreadPoolExecutorFactory() {} - - /** - * Class to handle rejection events from a ThreadPoolExecutor by creating a new Thread to run the task, unless the - * executor has been shut down, in which case the task is discarded. - */ - private static class RejectedExecutionPolicy implements RejectedExecutionHandler, LogOutputAppendable { - final Logger log; - final String executorName; - final String threadName; - private final AtomicInteger executorThreadNumber; - - /** - * Creates a {@code RejectedExecutionPolicy}. - * - * @param log a Logger - * @param executorName a name to be used when logging thread startup messages - * @param threadName the name prefix for new threads - */ - private RejectedExecutionPolicy(final Logger log, - final String executorName, - final String threadName, - final AtomicInteger executorThreadNumber) { - this.log = log; - this.executorName = executorName; - this.threadName = threadName; - this.executorThreadNumber = executorThreadNumber; - } - - /** - * Executes task r in a new thread, unless the executor has been shut down, in which case the task is discarded. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - */ - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - if (!e.isShutdown()) { - final String newThreadName = threadName + executorThreadNumber.getAndIncrement(); - log.warn().append("Executor has run out of threads for ").append(this).append(", creating new thread ") - .append(newThreadName).endl(); - newDaemonThread(r, newThreadName).start(); - } - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("executor ").append(executorName); - } - } - - private static Thread newDaemonThread(Runnable r, final String name) { - final Thread t = new Thread(r, name); - t.setDaemon(true); - return t; - } - - /** - * Create a {@link ThreadPoolExecutor} with the characteristics defined in - * {@link ExpandingThreadPoolExecutorFactory}. - * - * @param log a Logger to log messages - * @param corePoolSize the core pool size (the executor will use this value for the initial core and maximum pool - * sizes) - * @param keepAliveMinutes the number of minutes to keep alive core threads - * @param executorName the name of the executor, used when logging dynamic thread creation - * @param poolThreadNamePrefix the prefix for thread pool threads - * @param dynamicThreadNamePrefix the prefix for dynamic (overflow) threads created when the maximum number of pool - * threads is exceeded - */ - public static ThreadPoolExecutor createThreadPoolExecutor(final Logger log, - final int corePoolSize, - final int keepAliveMinutes, - final String executorName, - final String poolThreadNamePrefix, - final String dynamicThreadNamePrefix) { - final AtomicInteger executorThreadNumber = new AtomicInteger(1); - return new ThreadPoolExecutor(corePoolSize, - corePoolSize, - keepAliveMinutes, - TimeUnit.MINUTES, - new SynchronousQueue<>(), - r -> newDaemonThread(r, poolThreadNamePrefix + executorThreadNumber.getAndIncrement()), - new RejectedExecutionPolicy(log, executorName, dynamicThreadNamePrefix, executorThreadNumber)); - } -} diff --git a/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java b/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java index fbd9b059855..910c28a38c1 100644 --- a/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java @@ -10,20 +10,42 @@ public class NamingThreadFactory implements ThreadFactory { private final AtomicInteger threadCounter = new AtomicInteger(0); - public final String name; - private final Class clazz; - private boolean daemon; + private final String name; + private final Class clazz; + private final boolean daemon; private final ThreadGroup threadGroup; - public NamingThreadFactory(final Class clazz, final String name) { - this(clazz, name, false); + /** + * Creates a thread factory using the provided class and name as part of the thread name. All created threads will + * be daemon threads. + * + * @param clazz a class to use when naming each thread + * @param name a name component to add after the class name when naming each thread + */ + public NamingThreadFactory(final Class clazz, final String name) { + this(clazz, name, true); } - public NamingThreadFactory(final Class clazz, final String name, boolean daemon) { + /** + * Creates a thread factory using the provided class and name as part of the thread name. + * + * @param clazz a class to use when naming each thread + * @param name a name component to add after the class name when naming each thread + * @param daemon true to make each thread a daemon thread + */ + public NamingThreadFactory(final Class clazz, final String name, boolean daemon) { this(null, clazz, name, daemon); } - public NamingThreadFactory(ThreadGroup threadGroup, final Class clazz, final String name, boolean daemon) { + /** + * Creates a thread factory using the provided class and name as part of the thread name. + * + * @param threadGroup a thread group to add each thread to + * @param clazz a class to use when naming each thread + * @param name a name component to add after the class name when naming each thread + * @param daemon true to make each thread a daemon thread + */ + public NamingThreadFactory(ThreadGroup threadGroup, final Class clazz, final String name, boolean daemon) { this.threadGroup = threadGroup; this.clazz = clazz; this.name = name; diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java new file mode 100644 index 00000000000..2d5b963f4a2 --- /dev/null +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -0,0 +1,45 @@ +package io.deephaven.util.thread; + +import io.deephaven.configuration.Configuration; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Extension point to allow threads that will run user code from within the platform to be controlled by configuration. + */ +public interface ThreadInitializationFactory { + /* private */ String[] CONFIGURED_INITIALIZATION_TYPES = + Configuration.getInstance().getStringArrayFromProperty("thread.initialization"); + /* private */ List INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES) + .filter(str -> !str.isBlank()) + .map(type -> { + try { + // noinspection unchecked + Class clazz = + (Class) Class.forName(type); + return clazz.getDeclaredConstructor().newInstance(); + } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException + | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException( + "Error instantiating initializer " + type + ", please check configuration"); + } + }) + .collect(Collectors.toUnmodifiableList()); + + /** + * Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run + * by a new thread. + */ + static Runnable wrapRunnable(Runnable runnable) { + Runnable acc = runnable; + for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) { + acc = INITIALIZER.createInitializer(acc); + } + return acc; + } + + Runnable createInitializer(Runnable runnable); +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index e967bcab2fd..97e722f3a08 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -6,6 +6,7 @@ import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.configuration.Configuration; import io.deephaven.util.thread.NamingThreadFactory; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import java.util.concurrent.ExecutorService; @@ -39,11 +40,11 @@ public static boolean isInitializationThread() { true) { @Override public Thread newThread(@NotNull Runnable r) { - return super.newThread(() -> { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); r.run(); - }); + })); } }; executorService = Executors.newFixedThreadPool(NUM_THREADS, threadFactory); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java index 4b2c1d4c50a..3cfc08258db 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java @@ -61,7 +61,7 @@ public class SparseSelect { private final static ExecutorService executor = SPARSE_SELECT_THREADS == 1 ? null : Executors.newFixedThreadPool(SPARSE_SELECT_THREADS, - new NamingThreadFactory(SparseSelect.class, "copyThread", true)); + new NamingThreadFactory(SparseSelect.class, "copyThread")); private SparseSelect() {} // static use only diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java index 12d3e010ac5..a36b49e7ef2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java @@ -12,6 +12,7 @@ import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider; import io.deephaven.engine.table.impl.locations.impl.SubscriptionAggregator; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.util.thread.NamingThreadFactory; import org.jetbrains.annotations.NotNull; import java.util.concurrent.Future; @@ -31,8 +32,6 @@ public class ExecutorTableDataRefreshService implements TableDataRefreshService private final long tableLocationProviderRefreshIntervalMillis; private final long tableLocationRefreshIntervalMillis; - private final AtomicInteger threadCount = new AtomicInteger(0); - private final ScheduledThreadPoolExecutor scheduler; private final Value providerSubscriptions; @@ -50,8 +49,9 @@ public ExecutorTableDataRefreshService(@NotNull final String name, this.tableLocationRefreshIntervalMillis = Require.gtZero(tableLocationRefreshIntervalMillis, "tableLocationRefreshIntervalMillis"); + NamingThreadFactory threadFactory = new NamingThreadFactory(TableDataRefreshService.class, "refreshThread"); scheduler = - new ScheduledThreadPoolExecutor(threadPoolSize, this::makeThread, new ThreadPoolExecutor.AbortPolicy()); + new ScheduledThreadPoolExecutor(threadPoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy()); scheduler.setRemoveOnCancelPolicy(true); providerSubscriptions = Stats.makeItem(NAME_PREFIX + name, "providerSubscriptions", Counter.FACTORY).getValue(); @@ -62,13 +62,6 @@ public ExecutorTableDataRefreshService(@NotNull final String name, .makeItem(NAME_PREFIX + name, "locationSubscriptionRefreshDurationNanos", State.FACTORY).getValue(); } - private Thread makeThread(final Runnable runnable) { - final Thread thread = - new Thread(runnable, NAME_PREFIX + name + "-refreshThread-" + threadCount.incrementAndGet()); - thread.setDaemon(true); - return thread; - } - @Override public void submitOneTimeAsyncTask(@NotNull final Runnable task) { scheduler.submit(task); diff --git a/engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java b/engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java deleted file mode 100644 index abbe6f5b307..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.engine.util; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - -public class DaemonThreadFactory implements ThreadFactory { - - private final ThreadFactory wrappedFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(Runnable r) { - Thread t = wrappedFactory.newThread(r); - t.setDaemon(true); - return t; - } -} diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java index ed827cbf268..831757e5d31 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java @@ -33,6 +33,9 @@ import io.deephaven.util.locks.AwareFunctionalLock; import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.thread.NamingThreadFactory; +import io.deephaven.util.thread.ThreadDump; +import io.deephaven.util.thread.ThreadInitializationFactory; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -262,17 +265,14 @@ public synchronized void take(final AccumulatedCycleStats out) { notificationProcessor = makeNotificationProcessor(); jvmIntrospectionContext = new JvmIntrospectionContext(); - refreshThread = new Thread("UpdateGraphProcessor." + name() + ".refreshThread") { - @Override - public void run() { - configureRefreshThread(); - // noinspection InfiniteLoopStatement - while (true) { - Assert.eqFalse(ALLOW_UNIT_TEST_MODE, "ALLOW_UNIT_TEST_MODE"); - refreshTablesAndFlushNotifications(); - } + refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { + configureRefreshThread(); + // noinspection InfiniteLoopStatement + while (true) { + Assert.eqFalse(ALLOW_UNIT_TEST_MODE, "ALLOW_UNIT_TEST_MODE"); + refreshTablesAndFlushNotifications(); } - }; + }), "UpdateGraphProcessor." + name() + ".refreshThread"); refreshThread.setDaemon(true); final int updateThreads = @@ -1800,10 +1800,10 @@ private UpdateGraphProcessorThreadFactory(@NotNull final ThreadGroup threadGroup @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(() -> { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { configureRefreshThread(); r.run(); - }); + })); } } @@ -1835,7 +1835,7 @@ private ExecutorService makeUnitTestRefreshExecutor() { private class UnitTestRefreshThreadFactory extends NamingThreadFactory { private UnitTestRefreshThreadFactory() { - super(UpdateGraphProcessor.class, "unitTestRefresh", true); + super(UpdateGraphProcessor.class, "unitTestRefresh"); } @Override diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index fbd9c41b0d1..052d9d200c6 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -45,6 +45,8 @@ default.processEnvironmentFactory=io.deephaven.util.process.DefaultProcessEnviro OperationInitializationThreadPool.threads=1 +deephaven.console.type=python + # Default session duration is 5 minutes http.session.durationMs=300000 @@ -63,3 +65,7 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se # jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list # as .version. client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper + + +# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory +thread.initialization=io.deephaven.server.console.python.DebuggingInitializer diff --git a/props/test-configs/src/main/resources/dh-tests.prop b/props/test-configs/src/main/resources/dh-tests.prop index af870cc538b..a2887ce51ae 100644 --- a/props/test-configs/src/main/resources/dh-tests.prop +++ b/props/test-configs/src/main/resources/dh-tests.prop @@ -98,4 +98,8 @@ http.session.durationMs=300000 AuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler authentication.client.configuration.list= client.version.list= + authentication.anonymous.warn=false + +deephaven.console.type=none +thread.initialization= diff --git a/py/server/deephaven_internal/java_threads.py b/py/server/deephaven_internal/java_threads.py new file mode 100644 index 00000000000..d18c0bb5b0e --- /dev/null +++ b/py/server/deephaven_internal/java_threads.py @@ -0,0 +1,33 @@ +import threading + +def create_thread_entry(thread_name): + """ + Helper to call from the JVM into python to set up py thread state exactly once per jvm thread, and support debugging + """ + # First, ensure that this Java thread has a python _DummyThread instance registered, which will have the same + # lifetime as the pythreadstate (and so, the tracing). This ensures that if debugging is enabled after this thread + # was created, it will correctly be able to trace this thread. + thread = threading.current_thread() + + # Assign the java thread name to the python thread + thread.name = 'java-' + thread_name + # Then, if pydevd has already been initialized, we should attempt to make ourselves known to it. + + # Return a def to Java with a particular name that will call back into the Java stack + def JavaThread(runnable): + try: + # Test each of our known debugger impls + for name in ['pydevd', 'pydevd_pycharm']: + debugger = __import__(name) + + # We don't want to be the first one to call settrace(), so check to see if setup completed on another + # thread before attempting it here + if debugger.SetupHolder.setup is not None: + debugger.settrace(suspend=False) + except ImportError: + # Debugger hasn't started yet (or we don't know which one is in use), so registering our thread + # above should be sufficient + pass + + runnable.run() + return JavaThread \ No newline at end of file diff --git a/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java new file mode 100644 index 00000000000..97867acf324 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java @@ -0,0 +1,44 @@ +package io.deephaven.server.console.python; + +import io.deephaven.configuration.Configuration; +import io.deephaven.util.thread.ThreadInitializationFactory; +import org.jpy.PyLib; +import org.jpy.PyModule; +import org.jpy.PyObject; + +import java.io.Closeable; + +/** + * If python is configured as the script language for this server, ensures that threads which may invoke python code + * will be able to be debugged. If python is disabled, this does nothing. + */ +public class DebuggingInitializer implements ThreadInitializationFactory { + @Override + public Runnable createInitializer(Runnable runnable) { + if (!"python".equals(Configuration.getInstance().getStringWithDefault("deephaven.console.type", null))) { + // python not enabled, don't accidentally start it + return runnable; + } + + return () -> { + DeephavenModule py_deephaven = (DeephavenModule) PyModule.importModule("deephaven_internal.java_threads") + .createProxy(PyLib.CallableKind.FUNCTION, DeephavenModule.class); + // First call in to create a custom function that has the same name as the Java thread (plus a prefix) + PyObject runnableResult = py_deephaven.create_thread_entry(Thread.currentThread().getName()); + // runnable.run(); + // Invoke that function directly from Java, so that we have only this one initial frame + runnableResult.call("__call__", runnable); + }; + } + + interface DeephavenModule extends Closeable { + /** + * Creates a new function that will initialize a thread in python, including creating a simple frame + * + * @param threadName the name of the java thread + * @return a callable PyObject + */ + PyObject create_thread_entry(String threadName); + } + +} diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index 133b48f14ce..4a817754152 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -29,6 +29,7 @@ import io.deephaven.server.util.Scheduler; import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.thread.NamingThreadFactory; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.BindableService; import io.grpc.ServerInterceptor; import org.jetbrains.annotations.NotNull; @@ -83,16 +84,10 @@ static Set primeInterceptors() { @Provides @Singleton public ScriptSession provideScriptSession(Map> scriptTypes) { - final String DEEPHAVEN_CONSOLE_TYPE = "deephaven.console.type"; - boolean configuredConsole = Configuration.getInstance().hasProperty(DEEPHAVEN_CONSOLE_TYPE); + // Check which script language is configured + String scriptSessionType = Configuration.getInstance().getProperty("deephaven.console.type"); - if (!configuredConsole && scriptTypes.size() == 1) { - // if there is only one; use it - return scriptTypes.values().iterator().next().get(); - } - - // otherwise, assume we want python... - String scriptSessionType = Configuration.getInstance().getStringWithDefault(DEEPHAVEN_CONSOLE_TYPE, "python"); + // Emit an error if the selected language isn't provided if (!scriptTypes.containsKey(scriptSessionType)) { throw new IllegalArgumentException("Console type not found: " + scriptSessionType); } @@ -155,15 +150,15 @@ public static UpdateGraphProcessor provideUpdateGraphProcessor() { private static class ThreadFactory extends NamingThreadFactory { public ThreadFactory(final String name) { - super(DeephavenApiServer.class, name, true); + super(DeephavenApiServer.class, name); } @Override public Thread newThread(final @NotNull Runnable r) { - return super.newThread(() -> { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); r.run(); - }); + })); } } }