serialTableOperationsSafe = ThreadLocal.withInitial(() -> false);
+
+ final LogicalClockImpl logicalClock = new LogicalClockImpl();
+
+ /**
+ * Encapsulates locking support.
+ */
+ private final UpdateGraphLock lock;
+
+ /**
+ * When UpdateGraph.printDependencyInformation is set to true, the UpdateGraph will print debug information for each
+ * notification that has dependency information; as well as which notifications have been completed and are
+ * outstanding.
+ */
+ private final boolean printDependencyInformation =
+ Configuration.getInstance().getBooleanWithDefault("UpdateGraph.printDependencyInformation", false);
+
+ private final String name;
+
+ final UpdatePerformanceTracker updatePerformanceTracker;
+
+ /**
+ * The BaseUpdateGraph is an abstract class that is suitable for extension by UpdateGraphs that process a set of
+ * sources and then the resulting {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
+ * Notifications} using a {@link NotificationProcessor}.
+ *
+ * @param name the name of the update graph, which must be unique
+ * @param allowUnitTestMode is unit test mode allowed, used for configuring the lock
+ * @param log the logger for this update graph
+ * @param minimumCycleDurationToLogNanos the minimum cycle time, in nanoseconds, that results in cycle times being
+ * logged to at an INFO level
+ */
+ public BaseUpdateGraph(
+ final String name,
+ final boolean allowUnitTestMode,
+ final Logger log,
+ long minimumCycleDurationToLogNanos) {
+ this.name = name;
+ this.log = log;
+ this.minimumCycleDurationToLogNanos = minimumCycleDurationToLogNanos;
+ notificationProcessor = PoisonedNotificationProcessor.INSTANCE;
+ jvmIntrospectionContext = new JvmIntrospectionContext();
+ lock = UpdateGraphLock.create(this, allowUnitTestMode);
+ updatePerformanceTracker = new UpdatePerformanceTracker(this);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public UpdateGraph getUpdateGraph() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return new LogOutputStringImpl().append(this).toString();
+ }
+
+ @Override
+ public LogicalClock clock() {
+ return logicalClock;
+ }
+ // region Accessors for the shared and exclusive locks
+
+ /**
+ *
+ * Get the shared lock for this {@link UpdateGraph}.
+ *
+ * Using this lock will prevent run processing from proceeding concurrently, but will allow other read-only
+ * processing to proceed.
+ *
+ * The shared lock implementation is expected to support reentrance.
+ *
+ * This lock does not support {@link java.util.concurrent.locks.Lock#newCondition()}. Use the exclusive
+ * lock if you need to wait on events that are driven by run processing.
+ *
+ * @return The shared lock for this {@link UpdateGraph}
+ */
+ public AwareFunctionalLock sharedLock() {
+ return lock.sharedLock();
+ }
+
+ /**
+ *
+ * Get the exclusive lock for this {@link UpdateGraph}.
+ *
+ * Using this lock will prevent run or read-only processing from proceeding concurrently.
+ *
+ * The exclusive lock implementation is expected to support reentrance.
+ *
+ * Note that using the exclusive lock while the shared lock is held by the current thread will result in exceptions,
+ * as lock upgrade is not supported.
+ *
+ * This lock does support {@link java.util.concurrent.locks.Lock#newCondition()}.
+ *
+ * @return The exclusive lock for this {@link UpdateGraph}
+ */
+ public AwareFunctionalLock exclusiveLock() {
+ return lock.exclusiveLock();
+ }
+
+ // endregion Accessors for the shared and exclusive locks
+
+ /**
+ * Test if this thread is part of our run thread executor service.
+ *
+ * @return whether this is one of our run threads.
+ */
+ @Override
+ public boolean currentThreadProcessesUpdates() {
+ return isUpdateThread.get();
+ }
+
+ @Override
+ public boolean serialTableOperationsSafe() {
+ return serialTableOperationsSafe.get();
+ }
+
+ @Override
+ public boolean setSerialTableOperationsSafe(final boolean newValue) {
+ final boolean old = serialTableOperationsSafe.get();
+ serialTableOperationsSafe.set(newValue);
+ return old;
+ }
+
+
+ /**
+ * Add a table to the list of tables to run and mark it as {@link DynamicNode#setRefreshing(boolean) refreshing} if
+ * it was a {@link DynamicNode}.
+ *
+ * @param updateSource The table to be added to the run list
+ */
+ @Override
+ public void addSource(@NotNull final Runnable updateSource) {
+ if (!running) {
+ throw new IllegalStateException("UpdateGraph is no longer running");
+ }
+
+ if (updateSource instanceof DynamicNode) {
+ ((DynamicNode) updateSource).setRefreshing(true);
+ }
+
+ sources.add(updateSource);
+ }
+
+
+ @Override
+ public void removeSource(@NotNull final Runnable updateSource) {
+ sources.remove(updateSource);
+ }
+
+ /**
+ * Remove a collection of sources from the list of refreshing sources.
+ *
+ * @implNote This will not set the sources as {@link DynamicNode#setRefreshing(boolean) non-refreshing}.
+ * @param sourcesToRemove The sources to remove from the list of refreshing sources
+ */
+ public void removeSources(final Collection sourcesToRemove) {
+ sources.removeAll(sourcesToRemove);
+ }
+
+ /**
+ * Return the number of valid sources.
+ *
+ * @return the number of valid sources
+ */
+ public int sourceCount() {
+ return sources.size();
+ }
+
+ /**
+ * Enqueue a notification to be flushed according to its priority. Non-terminal notifications should only be
+ * enqueued during the updating phase of a cycle. That is, they should be enqueued from an update source or
+ * subsequent notification delivery.
+ *
+ * @param notification The notification to enqueue
+ * @see NotificationQueue.Notification#isTerminal()
+ * @see LogicalClock.State
+ */
+ @Override
+ public void addNotification(@NotNull final Notification notification) {
+ if (notification.isTerminal()) {
+ synchronized (terminalNotifications) {
+ terminalNotifications.offer(notification);
+ }
+ } else {
+ logDependencies().append(Thread.currentThread().getName()).append(": Adding notification ")
+ .append(notification).endl();
+ synchronized (pendingNormalNotifications) {
+ Assert.eq(logicalClock.currentState(), "logicalClock.currentState()",
+ LogicalClock.State.Updating, "LogicalClock.State.Updating");
+ pendingNormalNotifications.offer(notification);
+ }
+ notificationProcessor.onNotificationAdded();
+ }
+ }
+
+ @Override
+ public boolean maybeAddNotification(@NotNull final Notification notification, final long deliveryStep) {
+ if (notification.isTerminal()) {
+ throw new IllegalArgumentException("Notification must not be terminal");
+ }
+ logDependencies().append(Thread.currentThread().getName()).append(": Adding notification ").append(notification)
+ .append(" if step is ").append(deliveryStep).endl();
+ final boolean added;
+ synchronized (pendingNormalNotifications) {
+ // Note that the clock is advanced to idle under the pendingNormalNotifications lock, after which point no
+ // further normal notifications will be processed on this cycle.
+ final long logicalClockValue = logicalClock.currentValue();
+ if (LogicalClock.getState(logicalClockValue) == LogicalClock.State.Updating
+ && LogicalClock.getStep(logicalClockValue) == deliveryStep) {
+ pendingNormalNotifications.offer(notification);
+ added = true;
+ } else {
+ added = false;
+ }
+ }
+ if (added) {
+ notificationProcessor.onNotificationAdded();
+ }
+ return added;
+ }
+
+ @Override
+ public boolean satisfied(final long step) {
+ StepUpdater.checkForOlderStep(step, sourcesLastSatisfiedStep);
+ return sourcesLastSatisfiedStep == step;
+ }
+
+ /**
+ * Enqueue a collection of notifications to be flushed.
+ *
+ * @param notifications The notification to enqueue
+ *
+ * @see #addNotification(Notification)
+ */
+ @Override
+ public void addNotifications(@NotNull final Collection extends Notification> notifications) {
+ synchronized (pendingNormalNotifications) {
+ synchronized (terminalNotifications) {
+ notifications.forEach(this::addNotification);
+ }
+ }
+ }
+
+ /**
+ * @return Whether this UpdateGraph has a mechanism that supports refreshing
+ */
+ @Override
+ public boolean supportsRefreshing() {
+ return true;
+ }
+
+ /**
+ * Reset state at the beginning or end of a unit test.
+ *
+ * @param after if this is done after a test, in which case the liveness scope is popped
+ * @param errors the list of errors generated during reset
+ */
+ @TestUseOnly
+ void resetForUnitTests(final boolean after, final List errors) {
+ sources.clear();
+ notificationProcessor.shutdown();
+ synchronized (pendingNormalNotifications) {
+ pendingNormalNotifications.clear();
+ }
+ isUpdateThread.remove();
+ synchronized (terminalNotifications) {
+ terminalNotifications.clear();
+ }
+ logicalClock.resetForUnitTests();
+ sourcesLastSatisfiedStep = logicalClock.currentStep();
+
+ refreshScope = null;
+ if (after) {
+ LivenessManager stackTop;
+ while ((stackTop = LivenessScopeStack.peek()) instanceof LivenessScope) {
+ LivenessScopeStack.pop((LivenessScope) stackTop);
+ }
+ CleanupReferenceProcessorInstance.resetAllForUnitTests();
+ }
+
+ ensureUnlocked("unit test reset thread", errors);
+ }
+
+ @TestUseOnly
+ void resetLock() {
+ lock.reset();
+ }
+
+ /**
+ * Flush all non-terminal notifications, complete the logical clock update cycle, then flush all terminal
+ * notifications.
+ *
+ * @param check whether to check that update sources have not yet been satisfied (false in unit test mode)
+ */
+ void flushNotificationsAndCompleteCycle(boolean check) {
+ // We cannot proceed with normal notifications, nor are we satisfied, until all update source refresh
+ // notifications have been processed. Note that non-update source notifications that require dependency
+ // satisfaction are delivered first to the pendingNormalNotifications queue, and hence will not be processed
+ // until we advance to the flush* methods.
+ // TODO: If and when we properly integrate update sources into the dependency tracking system, we can
+ // discontinue this distinct phase, along with the requirement to treat the UpdateGraph itself as a Dependency.
+ // Until then, we must delay the beginning of "normal" notification processing until all update sources are
+ // done. See IDS-8039.
+ notificationProcessor.doAllWork();
+
+ updateSourcesLastSatisfiedStep(check);
+
+ flushNormalNotificationsAndCompleteCycle();
+ flushTerminalNotifications();
+ synchronized (pendingNormalNotifications) {
+ Assert.assertion(pendingNormalNotifications.isEmpty(), "pendingNormalNotifications.isEmpty()");
+ }
+ }
+
+ void updateSourcesLastSatisfiedStep(boolean check) {
+ if (check && sourcesLastSatisfiedStep >= logicalClock.currentStep()) {
+ throw new IllegalStateException("Already marked sources as satisfied!");
+ }
+ sourcesLastSatisfiedStep = logicalClock.currentStep();
+ }
+
+ /**
+ * Flush all non-terminal {@link Notification notifications} from the queue.
+ */
+ private void flushNormalNotificationsAndCompleteCycle() {
+ final IntrusiveDoublyLinkedQueue pendingToEvaluate =
+ new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
+ while (true) {
+ final int outstandingCountAtStart = notificationProcessor.outstandingNotificationsCount();
+ notificationProcessor.beforeNotificationsDrained();
+ synchronized (pendingNormalNotifications) {
+ pendingToEvaluate.transferAfterTailFrom(pendingNormalNotifications);
+ if (outstandingCountAtStart == 0 && pendingToEvaluate.isEmpty()) {
+ // We complete the cycle here before releasing the lock on pendingNotifications, so that
+ // maybeAddNotification can detect scenarios where the notification cannot be delivered on the
+ // desired step.
+ logicalClock.completeUpdateCycle();
+ break;
+ }
+ }
+ logDependencies().append(Thread.currentThread().getName())
+ .append(": Notification queue size=").append(pendingToEvaluate.size())
+ .append(", outstanding=").append(outstandingCountAtStart)
+ .endl();
+
+ boolean nothingBecameSatisfied = true;
+ for (final Iterator it = pendingToEvaluate.iterator(); it.hasNext();) {
+ final Notification notification = it.next();
+
+ Assert.eqFalse(notification.isTerminal(), "notification.isTerminal()");
+ Assert.eqFalse(notification.mustExecuteWithUpdateGraphLock(),
+ "notification.mustExecuteWithUpdateGraphLock()");
+
+ final boolean satisfied = notification.canExecute(sourcesLastSatisfiedStep);
+ if (satisfied) {
+ nothingBecameSatisfied = false;
+ it.remove();
+ logDependencies().append(Thread.currentThread().getName())
+ .append(": Submitting to notification processor ").append(notification).endl();
+ notificationProcessor.submit(notification);
+ } else {
+ logDependencies().append(Thread.currentThread().getName()).append(": Unmet dependencies for ")
+ .append(notification).endl();
+ }
+ }
+ if (outstandingCountAtStart == 0 && nothingBecameSatisfied) {
+ throw new IllegalStateException(
+ "No outstanding notifications, yet the notification queue is not empty!");
+ }
+ if (notificationProcessor.outstandingNotificationsCount() > 0) {
+ notificationProcessor.doWork();
+ }
+ }
+ synchronized (pendingNormalNotifications) {
+ Assert.eqZero(pendingNormalNotifications.size() + pendingToEvaluate.size(),
+ "pendingNormalNotifications.size() + pendingToEvaluate.size()");
+ }
+ }
+
+ /**
+ * Flush all {@link Notification#isTerminal() terminal} {@link Notification notifications} from the queue.
+ *
+ * @implNote Any notification that may have been queued while the clock's state is Updating must be invoked during
+ * this cycle's Idle phase.
+ */
+ private void flushTerminalNotifications() {
+ synchronized (terminalNotifications) {
+ for (final Iterator it = terminalNotifications.iterator(); it.hasNext();) {
+ final Notification notification = it.next();
+ Assert.assertion(notification.isTerminal(), "notification.isTerminal()");
+
+ if (!notification.mustExecuteWithUpdateGraphLock()) {
+ it.remove();
+ // for the single threaded queue case; this enqueues the notification;
+ // for the executor service case, this causes the notification to be kicked off
+ notificationProcessor.submit(notification);
+ }
+ }
+ }
+
+ // run the notifications that must be run on this thread
+ while (true) {
+ final Notification notificationForThisThread;
+ synchronized (terminalNotifications) {
+ notificationForThisThread = terminalNotifications.poll();
+ }
+ if (notificationForThisThread == null) {
+ break;
+ }
+ runNotification(notificationForThisThread);
+ }
+
+ // We can not proceed until all of the terminal notifications have executed.
+ notificationProcessor.doAllWork();
+ }
+
+ /**
+ * Abstract away the details of satisfied notification processing.
+ */
+ interface NotificationProcessor {
+
+ /**
+ * Submit a satisfied notification for processing.
+ *
+ * @param notification The notification
+ */
+ void submit(@NotNull NotificationQueue.Notification notification);
+
+ /**
+ * Submit a queue of satisfied notification for processing.
+ *
+ * @param notifications The queue of notifications to
+ * {@link IntrusiveDoublyLinkedQueue#transferAfterTailFrom(IntrusiveDoublyLinkedQueue) transfer} from.
+ * Will become empty as a result of successful completion
+ */
+ void submitAll(@NotNull IntrusiveDoublyLinkedQueue notifications);
+
+ /**
+ * Query the number of outstanding notifications submitted to this processor.
+ *
+ * @return The number of outstanding notifications
+ */
+ int outstandingNotificationsCount();
+
+ /**
+ *
+ * Do work (or in the multi-threaded case, wait for some work to have happened).
+ *
+ * Caller must know that work is outstanding.
+ */
+ void doWork();
+
+ /**
+ * Do all outstanding work.
+ */
+ void doAllWork();
+
+ /**
+ * Shutdown this notification processor (for unit tests).
+ */
+ void shutdown();
+
+ /**
+ * Called after a pending notification is added.
+ */
+ void onNotificationAdded();
+
+ /**
+ * Called before pending notifications are drained.
+ */
+ void beforeNotificationsDrained();
+ }
+
+ void runNotification(@NotNull final Notification notification) {
+ logDependencies().append(Thread.currentThread().getName()).append(": Executing ").append(notification).endl();
+
+ final LivenessScope scope;
+ final boolean releaseScopeOnClose;
+ if (notification.isTerminal()) {
+ // Terminal notifications can't create new notifications, so they have no need to participate in a shared
+ // run scope.
+ scope = new LivenessScope();
+ releaseScopeOnClose = true;
+ } else {
+ // Non-terminal notifications must use a shared run scope.
+ Assert.neqNull(refreshScope, "refreshScope");
+ scope = refreshScope == LivenessScopeStack.peek() ? null : refreshScope;
+ releaseScopeOnClose = false;
+ }
+
+ try (final SafeCloseable ignored = scope == null ? null : LivenessScopeStack.open(scope, releaseScopeOnClose)) {
+ notification.run();
+ logDependencies().append(Thread.currentThread().getName()).append(": Completed ").append(notification)
+ .endl();
+ } catch (final Exception e) {
+ log.error().append(Thread.currentThread().getName())
+ .append(": Exception while executing UpdateGraph notification: ").append(notification)
+ .append(": ").append(e).endl();
+ ProcessEnvironment.getGlobalFatalErrorReporter()
+ .report("Exception while processing UpdateGraph notification", e);
+ }
+ }
+
+ class QueueNotificationProcessor implements NotificationProcessor {
+
+ final IntrusiveDoublyLinkedQueue satisfiedNotifications =
+ new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
+
+ @Override
+ public void submit(@NotNull final Notification notification) {
+ satisfiedNotifications.offer(notification);
+ }
+
+ @Override
+ public void submitAll(@NotNull IntrusiveDoublyLinkedQueue notifications) {
+ satisfiedNotifications.transferAfterTailFrom(notifications);
+ }
+
+ @Override
+ public int outstandingNotificationsCount() {
+ return satisfiedNotifications.size();
+ }
+
+ @Override
+ public void doWork() {
+ Notification satisfiedNotification;
+ while ((satisfiedNotification = satisfiedNotifications.poll()) != null) {
+ runNotification(satisfiedNotification);
+ }
+ }
+
+ @Override
+ public void doAllWork() {
+ doWork();
+ }
+
+ @Override
+ public void shutdown() {
+ satisfiedNotifications.clear();
+ }
+
+ @Override
+ public void onNotificationAdded() {}
+
+ @Override
+ public void beforeNotificationsDrained() {}
+ }
+
+
+ static LogEntry appendAsMillisFromNanos(final LogEntry entry, final long nanos) {
+ if (nanos > 0) {
+ return entry.appendDouble(nanos / 1_000_000.0, 3);
+ }
+ return entry.append(0);
+ }
+
+ /**
+ * Iterate over all monitored tables and run them.
+ */
+ void refreshTablesAndFlushNotifications() {
+ final long startTimeNanos = System.nanoTime();
+
+ currentCycleLockWaitTotalNanos = 0;
+ jvmIntrospectionContext.startSample();
+
+ if (sources.isEmpty()) {
+ exclusiveLock().doLocked(this::flushTerminalNotifications);
+ } else {
+ refreshAllTables();
+ }
+
+ jvmIntrospectionContext.endSample();
+ final long cycleTimeNanos = System.nanoTime() - startTimeNanos;
+ computeStatsAndLogCycle(cycleTimeNanos);
+ }
+
+ private void computeStatsAndLogCycle(final long cycleTimeNanos) {
+ final long safePointPauseTimeMillis = jvmIntrospectionContext.deltaSafePointPausesTimeMillis();
+ accumulatedCycleStats.accumulate(
+ isCycleOnBudget(cycleTimeNanos),
+ cycleTimeNanos,
+ jvmIntrospectionContext.deltaSafePointPausesCount(),
+ safePointPauseTimeMillis);
+ if (cycleTimeNanos >= minimumCycleDurationToLogNanos) {
+ if (suppressedCycles > 0) {
+ logSuppressedCycles();
+ }
+ final double cycleTimeMillis = cycleTimeNanos / 1_000_000.0;
+ LogEntry entry = log.info().append(getName())
+ .append(": Update Graph Processor cycleTime=").appendDouble(cycleTimeMillis, 3);
+ if (jvmIntrospectionContext.hasSafePointData()) {
+ final long safePointSyncTimeMillis = jvmIntrospectionContext.deltaSafePointSyncTimeMillis();
+ entry = entry
+ .append("ms, safePointTime=")
+ .append(safePointPauseTimeMillis)
+ .append("ms, safePointTimePct=");
+ if (safePointPauseTimeMillis > 0 && cycleTimeMillis > 0.0) {
+ final double safePointTimePct = 100.0 * safePointPauseTimeMillis / cycleTimeMillis;
+ entry = entry.appendDouble(safePointTimePct, 2);
+ } else {
+ entry = entry.append("0");
+ }
+ entry = entry.append("%, safePointSyncTime=").append(safePointSyncTimeMillis);
+ }
+ entry = entry.append("ms, lockWaitTime=");
+ entry = appendAsMillisFromNanos(entry, currentCycleLockWaitTotalNanos);
+ entry.append("ms").endl();
+ return;
+ }
+ if (cycleTimeNanos > 0) {
+ ++suppressedCycles;
+ suppressedCyclesTotalNanos += cycleTimeNanos;
+ suppressedCyclesTotalSafePointTimeMillis += safePointPauseTimeMillis;
+ if (suppressedCyclesTotalNanos >= minimumCycleDurationToLogNanos) {
+ logSuppressedCycles();
+ }
+ }
+ }
+
+ /**
+ * Is the provided cycle time on budget?
+ *
+ * @param cycleTimeNanos the cycle time, in nanoseconds
+ *
+ * @return true if the cycle time is within the desired budget
+ */
+ public boolean isCycleOnBudget(long cycleTimeNanos) {
+ return true;
+ }
+
+ private void logSuppressedCycles() {
+ LogEntry entry = log.info()
+ .append("Minimal Update Graph Processor cycle times: ")
+ .appendDouble((double) (suppressedCyclesTotalNanos) / 1_000_000.0, 3).append("ms / ")
+ .append(suppressedCycles).append(" cycles = ")
+ .appendDouble(
+ (double) suppressedCyclesTotalNanos / (double) suppressedCycles / 1_000_000.0, 3)
+ .append("ms/cycle average)");
+ if (jvmIntrospectionContext.hasSafePointData()) {
+ entry = entry
+ .append(", safePointTime=")
+ .append(suppressedCyclesTotalSafePointTimeMillis)
+ .append("ms");
+ }
+ entry.endl();
+ suppressedCycles = suppressedCyclesTotalNanos = 0;
+ suppressedCyclesTotalSafePointTimeMillis = 0;
+ }
+
+
+ void maybeFlushUpdatePerformance(final long nowNanos, final long checkTime) {
+ if (checkTime >= nextUpdatePerformanceTrackerFlushTimeNanos) {
+ nextUpdatePerformanceTrackerFlushTimeNanos =
+ nowNanos + MILLISECONDS.toNanos(UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS);
+ try {
+ updatePerformanceTracker.flush();
+ } catch (Exception err) {
+ log.error().append("Error flushing UpdatePerformanceTracker: ").append(err).endl();
+ }
+ }
+ }
+
+ /**
+ * In unit tests it can be convenient to force the update performance tracker to flush, without waiting for the
+ * complete REPORT_INTERVAL_MILLIS to elapse.
+ */
+ @TestUseOnly
+ public void resetNextFlushTime() {
+ nextUpdatePerformanceTrackerFlushTimeNanos = 0;
+ }
+
+ /**
+ * Refresh all the update sources within an {@link LogicalClock update cycle} after the UpdateGraph has been locked.
+ * At the end of the updates all {@link Notification notifications} will be flushed.
+ */
+ void refreshAllTables() {
+ doRefresh(() -> sources.forEach((final UpdateSourceRefreshNotification updateSourceNotification,
+ final Runnable unused) -> notificationProcessor.submit(updateSourceNotification)));
+ }
+
+ /**
+ * Perform a run cycle, using {@code refreshFunction} to ensure the desired update sources are refreshed at the
+ * start.
+ *
+ * @param refreshFunction Function to submit one or more {@link UpdateSourceRefreshNotification update source
+ * refresh notifications} to the {@link NotificationProcessor notification processor} or run them directly.
+ */
+ private void doRefresh(@NotNull final Runnable refreshFunction) {
+ final long lockStartTimeNanos = System.nanoTime();
+ exclusiveLock().doLocked(() -> {
+ currentCycleLockWaitTotalNanos += System.nanoTime() - lockStartTimeNanos;
+ if (!running) {
+ return;
+ }
+ synchronized (pendingNormalNotifications) {
+ Assert.eqZero(pendingNormalNotifications.size(), "pendingNormalNotifications.size()");
+ }
+ Assert.eqNull(refreshScope, "refreshScope");
+ refreshScope = new LivenessScope();
+ final long updatingCycleValue = logicalClock.startUpdateCycle();
+ logDependencies().append("Beginning UpdateGraph cycle step=")
+ .append(logicalClock.currentStep()).endl();
+ try (final SafeCloseable ignored = LivenessScopeStack.open(refreshScope, true)) {
+ refreshFunction.run();
+ flushNotificationsAndCompleteCycle(true);
+ } finally {
+ logicalClock.ensureUpdateCycleCompleted(updatingCycleValue);
+ refreshScope = null;
+ }
+ logDependencies().append("Completed UpdateGraph cycle step=")
+ .append(logicalClock.currentStep()).endl();
+ });
+ }
+
+ /**
+ * Re-usable class for adapting update sources to {@link Notification}s.
+ */
+ static final class UpdateSourceRefreshNotification extends AbstractNotification
+ implements SimpleReference {
+
+ private final WeakReference updateSourceRef;
+
+ private UpdateSourceRefreshNotification(@NotNull final Runnable updateSource) {
+ super(false);
+ updateSourceRef = new WeakReference<>(updateSource);
+ }
+
+ @Override
+ public LogOutput append(@NotNull final LogOutput logOutput) {
+ return logOutput.append("UpdateSourceRefreshNotification{").append(System.identityHashCode(this))
+ .append(", for UpdateSource{").append(System.identityHashCode(get())).append("}}");
+ }
+
+ @Override
+ public boolean canExecute(final long step) {
+ return true;
+ }
+
+ @Override
+ public void run() {
+ final Runnable updateSource = updateSourceRef.get();
+ if (updateSource == null) {
+ return;
+ }
+ updateSource.run();
+ }
+
+ @Override
+ public Runnable get() {
+ // NB: Arguably we should make get() and clear() synchronized.
+ return updateSourceRef.get();
+ }
+
+ @Override
+ public void clear() {
+ updateSourceRef.clear();
+ }
+ }
+
+ public LogEntry logDependencies() {
+ if (printDependencyInformation) {
+ return log.info();
+ } else {
+ return LogEntry.NULL;
+ }
+ }
+
+ /**
+ * Ensure the lock is not held by the current thread.
+ *
+ * @param callerDescription the description of the caller
+ * @param errors an optional list to populate with errors when the lock is held.
+ */
+ @TestUseOnly
+ void ensureUnlocked(@NotNull final String callerDescription, @Nullable final List errors) {
+ if (exclusiveLock().isHeldByCurrentThread()) {
+ if (errors != null) {
+ errors.add(callerDescription + ": UpdateGraph exclusive lock is still held");
+ }
+ while (exclusiveLock().isHeldByCurrentThread()) {
+ exclusiveLock().unlock();
+ }
+ }
+ if (sharedLock().isHeldByCurrentThread()) {
+ if (errors != null) {
+ errors.add(callerDescription + ": UpdateGraph shared lock is still held");
+ }
+ while (sharedLock().isHeldByCurrentThread()) {
+ sharedLock().unlock();
+ }
+ }
+ }
+
+ public void takeAccumulatedCycleStats(AccumulatedCycleStats updateGraphAccumCycleStats) {
+ accumulatedCycleStats.take(updateGraphAccumCycleStats);
+ }
+
+ public static UpdateGraph getInstance(final String name) {
+ return INSTANCES.get(name);
+ }
+
+
+
+ /**
+ * Remove a named UpdateGraph.
+ *
+ *
+ * In addition to removing the UpdateGraph from the instances, an attempt is made to {@link #stop()} it.
+ *
+ *
+ * @param name the name of the UpdateGraph to remove
+ * @return true if the update graph was found
+ */
+ public static boolean removeInstance(final String name) {
+ final UpdateGraph graph;
+ synchronized (INSTANCES) {
+ graph = INSTANCES.removeKey(name);
+ if (graph == null) {
+ return false;
+ }
+ }
+ graph.stop();
+ return true;
+ }
+
+ /**
+ * Builds and caches a new UpdateGraph named {@code name} and provided by {@code construct}. It is an error if there
+ * is already an UpdateGraph with the same name.
+ *
+ * @param name the name of the new update graph
+ * @param construct A {@link Supplier} to construct an UpdateGraph if no update graph with the name already exists.
+ * The Supplier must provide an update graph with the given name.
+ *
+ * @throws IllegalStateException if an UpdateGraph with the provided name already exists
+ */
+ public static T buildOrThrow(final String name, final Supplier construct) {
+ synchronized (INSTANCES) {
+ if (INSTANCES.containsKey(name)) {
+ throw new IllegalStateException(
+ String.format("UpdateGraph with name %s already exists", name));
+ }
+ final T newGraph = construct.get();
+ Assert.equals(newGraph.getName(), "newGraph.getName()", name, "name");
+ INSTANCES.put(name, newGraph);
+ return newGraph;
+ }
+ }
+
+ /**
+ * Returns an existing UpdateGraph with the provided {@code name} if one exists, else returns a new named
+ * UpdateGraph supplied by {@code construct}.
+ *
+ * @param construct A {@link Supplier} to construct an UpdateGraph if no update graph with the name already exists.
+ * The Supplier must provide an update graph with the given name.
+ *
+ * @return the UpdateGraph
+ */
+ public static T existingOrBuild(final String name, Supplier construct) {
+ return INSTANCES.putIfAbsent(name, (nameToInsert) -> {
+ final T newGraph = construct.get();
+ Assert.equals(newGraph.getName(), "newGraph.getName()", nameToInsert, "name");
+ return newGraph;
+ }).cast();
+ }
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/EventDrivenUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/EventDrivenUpdateGraph.java
new file mode 100644
index 00000000000..701a0d17878
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/EventDrivenUpdateGraph.java
@@ -0,0 +1,143 @@
+package io.deephaven.engine.updategraph.impl;
+
+import io.deephaven.base.log.LogOutput;
+import io.deephaven.engine.context.ExecutionContext;
+import io.deephaven.internal.log.LoggerFactory;
+import io.deephaven.io.logger.Logger;
+import io.deephaven.util.SafeCloseable;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * An EventDrivenUpdateGraph provides an isolated refresh processor.
+ *
+ *
+ * As with a {@link PeriodicUpdateGraph}, the EventDrivenUpdateGraph contains a set of sources, but it is refreshed only
+ * when a call to {@link #requestRefresh()} is made. All sources are synchronously refreshed on that thread, and then
+ * the resultant notifications are also synchronously processed.
+ *
+ */
+public class EventDrivenUpdateGraph extends BaseUpdateGraph {
+ private static final Logger log = LoggerFactory.getLogger(EventDrivenUpdateGraph.class);
+ private boolean started = false;
+
+ /**
+ * Create a builder for an EventDrivenUpdateGraph with the given name.
+ *
+ * @param name the name of the new EventDrivenUpdateGraph
+ * @return a builder for the EventDrivenUpdateGraph
+ */
+ public static EventDrivenUpdateGraph.Builder newBuilder(final String name) {
+ return new EventDrivenUpdateGraph.Builder(name);
+ }
+
+ private EventDrivenUpdateGraph(String name, long minimumCycleDurationToLogNanos) {
+ super(name, false, log, minimumCycleDurationToLogNanos);
+ notificationProcessor = new QueueNotificationProcessor();
+ }
+
+ @Override
+ public LogOutput append(@NotNull final LogOutput logOutput) {
+ return logOutput.append("EventDrivenUpdateGraph-").append(getName());
+ }
+
+ @Override
+ public int parallelismFactor() {
+ return 1;
+ }
+
+ /**
+ * Refresh all sources and execute the resulting notifications synchronously on this thread.
+ */
+ @Override
+ public void requestRefresh() {
+ maybeStart();
+ // do the work to refresh everything, on this thread
+ isUpdateThread.set(true);
+ try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
+ refreshAllTables();
+ } finally {
+ isUpdateThread.remove();
+ }
+ final long nowNanos = System.nanoTime();
+ synchronized (this) {
+ maybeFlushUpdatePerformance(nowNanos, nowNanos);
+ }
+ }
+
+ /**
+ * We defer starting the update performance tracker until our first cycle. This is essential when we are the DEFAULT
+ * graph used for UPT publishing, as the UPT requires the publication graph to be in the BaseUpdateGraph map, which
+ * is not done until after our constructor completes.
+ */
+ private synchronized void maybeStart() {
+ if (started) {
+ return;
+ }
+ updatePerformanceTracker.start();
+ started = true;
+ }
+
+ @Override
+ public void stop() {
+ running = false;
+ // if we wait for the lock to be done, then we should have completed our cycle and will not execute again
+ exclusiveLock().doLocked(() -> {
+ });
+ }
+
+ /**
+ * Builds or retrieves a new EventDrivenUpdateGraph.
+ */
+ public static class Builder {
+ private final String name;
+ private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;
+
+ public Builder(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Set the minimum duration of an update cycle that should be logged at the INFO level.
+ *
+ * @param minimumCycleDurationToLogNanos threshold to log a slow cycle
+ * @return this builder
+ */
+ public Builder minimumCycleDurationToLogNanos(long minimumCycleDurationToLogNanos) {
+ this.minimumCycleDurationToLogNanos = minimumCycleDurationToLogNanos;
+ return this;
+ }
+
+ /**
+ * Constructs and returns an EventDrivenUpdateGraph. It is an error to do so if an UpdateGraph already exists
+ * with the name provided to this builder.
+ *
+ * @return the new EventDrivenUpdateGraph
+ * @throws IllegalStateException if an UpdateGraph with the provided name already exists
+ */
+ public EventDrivenUpdateGraph build() {
+ return BaseUpdateGraph.buildOrThrow(name, this::construct);
+ }
+
+ /**
+ * Returns an existing EventDrivenUpdateGraph with the name provided to this Builder, if one exists, else
+ * returns a new EventDrivenUpdateGraph.
+ *
+ *
+ * If the options for the existing graph are different than the options specified in this Builder, this
+ * Builder's options are ignored.
+ *
+ *
+ * @return the EventDrivenUpdateGraph
+ * @throws ClassCastException if the existing graph is not an EventDrivenUpdateGraph
+ */
+ public EventDrivenUpdateGraph existingOrBuild() {
+ return BaseUpdateGraph.existingOrBuild(name, this::construct);
+ }
+
+ private EventDrivenUpdateGraph construct() {
+ return new EventDrivenUpdateGraph(
+ name,
+ minimumCycleDurationToLogNanos);
+ }
+ }
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
index 34e0462dde7..a86225c9522 100644
--- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
+++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
@@ -6,42 +6,29 @@
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.SleepUtil;
import io.deephaven.base.log.LogOutput;
-import io.deephaven.base.reference.SimpleReference;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
-import io.deephaven.engine.liveness.LivenessManager;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
-import io.deephaven.engine.table.impl.perf.PerformanceEntry;
-import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
-import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.*;
-import io.deephaven.engine.util.reference.CleanupReferenceProcessorInstance;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
-import io.deephaven.hash.KeyedObjectHashMap;
-import io.deephaven.hash.KeyedObjectKey;
-import io.deephaven.hotspot.JvmIntrospectionContext;
import io.deephaven.internal.log.LoggerFactory;
-import io.deephaven.io.log.LogEntry;
-import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.TestUseOnly;
-import io.deephaven.util.datastructures.SimpleReferenceManager;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
import io.deephaven.util.function.ThrowingRunnable;
-import io.deephaven.util.locks.AwareFunctionalLock;
-import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import java.lang.ref.WeakReference;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -62,9 +49,8 @@
* defined)
*
*/
-public class PeriodicUpdateGraph implements UpdateGraph {
+public class PeriodicUpdateGraph extends BaseUpdateGraph {
- public static final String DEFAULT_UPDATE_GRAPH_NAME = "DEFAULT";
public static final int NUM_THREADS_DEFAULT_UPDATE_GRAPH =
Configuration.getInstance().getIntegerWithDefault("PeriodicUpdateGraph.updateThreads", -1);
@@ -72,56 +58,8 @@ public static Builder newBuilder(final String name) {
return new Builder(name);
}
- /**
- * If the provided update graph is a {@link PeriodicUpdateGraph} then create a PerformanceEntry using the given
- * description. Otherwise, return null.
- *
- * @param updateGraph The update graph to create a performance entry for.
- * @param description The description for the performance entry.
- * @return The performance entry, or null if the update graph is not a {@link PeriodicUpdateGraph}.
- */
- @Nullable
- public static PerformanceEntry createUpdatePerformanceEntry(
- final UpdateGraph updateGraph,
- final String description) {
- if (updateGraph instanceof PeriodicUpdateGraph) {
- final PeriodicUpdateGraph pug = (PeriodicUpdateGraph) updateGraph;
- if (pug.updatePerformanceTracker != null) {
- return pug.updatePerformanceTracker.getEntry(description);
- }
- throw new IllegalStateException("Cannot create a performance entry for a PeriodicUpdateGraph that has "
- + "not been completely constructed.");
- }
- return null;
- }
-
- private static final KeyedObjectHashMap INSTANCES = new KeyedObjectHashMap<>(
- new KeyedObjectKey.BasicAdapter<>(PeriodicUpdateGraph::getName));
-
- private final Logger log = LoggerFactory.getLogger(PeriodicUpdateGraph.class);
-
- /**
- * Update sources that are part of this PeriodicUpdateGraph.
- */
- private final SimpleReferenceManager sources =
- new SimpleReferenceManager<>(UpdateSourceRefreshNotification::new);
-
- /**
- * Recorder for updates source satisfaction as a phase of notification processing.
- */
- private volatile long sourcesLastSatisfiedStep;
- /**
- * The queue of non-terminal notifications to process.
- */
- private final IntrusiveDoublyLinkedQueue pendingNormalNotifications =
- new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
-
- /**
- * The queue of terminal notifications to process.
- */
- private final IntrusiveDoublyLinkedQueue terminalNotifications =
- new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
+ private static final Logger log = LoggerFactory.getLogger(PeriodicUpdateGraph.class);
/**
* A flag indicating that an accelerated cycle has been requested.
@@ -129,7 +67,6 @@ public static PerformanceEntry createUpdatePerformanceEntry(
private final AtomicBoolean refreshRequested = new AtomicBoolean();
private final Thread refreshThread;
- private volatile boolean running = true;
/**
* {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}.
@@ -159,113 +96,8 @@ public static PerformanceEntry createUpdatePerformanceEntry(
public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
"PeriodicUpdateGraph.targetCycleDurationMillis";
- public static final String MINIMUM_CYCLE_DURATION_TO_LOG_MILLIS_PROP =
- "PeriodicUpdateGraph.minimumCycleDurationToLogMillis";
private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
- private final long minimumCycleDurationToLogNanos;
-
- /** when to next flush the performance tracker; initializes to zero to force a flush on start */
- private long nextUpdatePerformanceTrackerFlushTimeNanos;
-
- /**
- * How many cycles we have not logged, but were non-zero.
- */
- private long suppressedCycles;
- private long suppressedCyclesTotalNanos;
- private long suppressedCyclesTotalSafePointTimeMillis;
-
- /**
- * Accumulated UpdateGraph exclusive lock waits for the current cycle (or previous, if idle).
- */
- private long currentCycleLockWaitTotalNanos;
- /**
- * Accumulated delays due to intracycle yields for the current cycle (or previous, if idle).
- */
- private long currentCycleYieldTotalNanos;
- /**
- * Accumulated delays due to intracycle sleeps for the current cycle (or previous, if idle).
- */
- private long currentCycleSleepTotalNanos;
-
- public static class AccumulatedCycleStats {
- /**
- * Number of cycles run.
- */
- public int cycles = 0;
- /**
- * Number of cycles run not exceeding their time budget.
- */
- public int cyclesOnBudget = 0;
- /**
- * Accumulated safepoints over all cycles.
- */
- public int safePoints = 0;
- /**
- * Accumulated safepoint time over all cycles.
- */
- public long safePointPauseTimeMillis = 0L;
-
- public int[] cycleTimesMicros = new int[32];
- public static final int MAX_DOUBLING_LEN = 1024;
-
- synchronized void accumulate(
- final long targetCycleDurationMillis,
- final long cycleTimeNanos,
- final long safePoints,
- final long safePointPauseTimeMillis) {
- final boolean onBudget = targetCycleDurationMillis * 1000 * 1000 >= cycleTimeNanos;
- if (onBudget) {
- ++cyclesOnBudget;
- }
- this.safePoints += safePoints;
- this.safePointPauseTimeMillis += safePointPauseTimeMillis;
- if (cycles >= cycleTimesMicros.length) {
- final int newLen;
- if (cycleTimesMicros.length < MAX_DOUBLING_LEN) {
- newLen = cycleTimesMicros.length * 2;
- } else {
- newLen = cycleTimesMicros.length + MAX_DOUBLING_LEN;
- }
- cycleTimesMicros = Arrays.copyOf(cycleTimesMicros, newLen);
- }
- cycleTimesMicros[cycles] = (int) ((cycleTimeNanos + 500) / 1_000);
- ++cycles;
- }
-
- public synchronized void take(final AccumulatedCycleStats out) {
- out.cycles = cycles;
- out.cyclesOnBudget = cyclesOnBudget;
- out.safePoints = safePoints;
- out.safePointPauseTimeMillis = safePointPauseTimeMillis;
- if (out.cycleTimesMicros.length < cycleTimesMicros.length) {
- out.cycleTimesMicros = new int[cycleTimesMicros.length];
- }
- System.arraycopy(cycleTimesMicros, 0, out.cycleTimesMicros, 0, cycles);
- cycles = 0;
- cyclesOnBudget = 0;
- safePoints = 0;
- safePointPauseTimeMillis = 0;
- }
- }
-
- public final AccumulatedCycleStats accumulatedCycleStats = new AccumulatedCycleStats();
-
- /**
- * Abstracts away the processing of non-terminal notifications.
- */
- private NotificationProcessor notificationProcessor;
-
- /**
- * Facilitate GC Introspection during refresh cycles.
- */
- private final JvmIntrospectionContext jvmIntrospectionContext;
-
- /**
- * The {@link LivenessScope} that should be on top of the {@link LivenessScopeStack} for all run and notification
- * processing. Only non-null while some thread is in {@link #doRefresh(Runnable)}.
- */
- private volatile LivenessScope refreshScope;
/**
* The number of threads in our executor service for dispatching notifications. If 1, then we don't actually use the
@@ -273,50 +105,21 @@ public synchronized void take(final AccumulatedCycleStats out) {
*/
private final int updateThreads;
- /**
- * Is this one of the threads engaged in notification processing? (Either the solitary run thread, or one of the
- * pooled threads it uses in some configurations)
- */
- private final ThreadLocal isUpdateThread = ThreadLocal.withInitial(() -> false);
-
- private final ThreadLocal serialTableOperationsSafe = ThreadLocal.withInitial(() -> false);
-
private final long minimumInterCycleSleep =
Configuration.getInstance().getIntegerWithDefault("PeriodicUpdateGraph.minimumInterCycleSleep", 0);
private final boolean interCycleYield =
Configuration.getInstance().getBooleanWithDefault("PeriodicUpdateGraph.interCycleYield", false);
- private final LogicalClockImpl logicalClock = new LogicalClockImpl();
-
- /**
- * Encapsulates locking support.
- */
- private final UpdateGraphLock lock;
-
- /**
- * When PeriodicUpdateGraph.printDependencyInformation is set to true, the PeriodicUpdateGraph will print debug
- * information for each notification that has dependency information; as well as which notifications have been
- * completed and are outstanding.
- */
- private final boolean printDependencyInformation =
- Configuration.getInstance().getBooleanWithDefault("PeriodicUpdateGraph.printDependencyInformation", false);
-
- private final String name;
-
- private final UpdatePerformanceTracker updatePerformanceTracker;
-
public PeriodicUpdateGraph(
final String name,
final boolean allowUnitTestMode,
final long targetCycleDurationMillis,
final long minimumCycleDurationToLogNanos,
final int numUpdateThreads) {
- this.name = name;
+ super(name, allowUnitTestMode, log, minimumCycleDurationToLogNanos);
this.allowUnitTestMode = allowUnitTestMode;
this.defaultTargetCycleDurationMillis = targetCycleDurationMillis;
this.targetCycleDurationMillis = targetCycleDurationMillis;
- this.minimumCycleDurationToLogNanos = minimumCycleDurationToLogNanos;
- this.lock = UpdateGraphLock.create(this, this.allowUnitTestMode);
if (numUpdateThreads <= 0) {
this.updateThreads = Runtime.getRuntime().availableProcessors();
@@ -324,9 +127,6 @@ public PeriodicUpdateGraph(
this.updateThreads = numUpdateThreads;
}
- notificationProcessor = PoisonedNotificationProcessor.INSTANCE;
- jvmIntrospectionContext = new JvmIntrospectionContext();
-
refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> {
configureRefreshThread();
while (running) {
@@ -343,31 +143,11 @@ public Thread newThread(@NotNull final Runnable r) {
return super.newThread(ThreadInitializationFactory.wrapRunnable(r));
}
});
-
- updatePerformanceTracker = new UpdatePerformanceTracker(this);
- }
-
- public String getName() {
- return name;
- }
-
- public UpdateGraph getUpdateGraph() {
- return this;
}
@Override
public LogOutput append(@NotNull final LogOutput logOutput) {
- return logOutput.append("PeriodicUpdateGraph-").append(name);
- }
-
- @Override
- public String toString() {
- return new LogOutputStringImpl().append(this).toString();
- }
-
- @Override
- public LogicalClock clock() {
- return logicalClock;
+ return logOutput.append("PeriodicUpdateGraph-").append(getName());
}
@NotNull
@@ -445,69 +225,6 @@ public int parallelismFactor() {
}
}
- // region Accessors for the shared and exclusive locks
-
- /**
- *
- * Get the shared lock for this {@link PeriodicUpdateGraph}.
- *
- * Using this lock will prevent run processing from proceeding concurrently, but will allow other read-only
- * processing to proceed.
- *
- * The shared lock implementation is expected to support reentrance.
- *
- * This lock does not support {@link java.util.concurrent.locks.Lock#newCondition()}. Use the exclusive
- * lock if you need to wait on events that are driven by run processing.
- *
- * @return The shared lock for this {@link PeriodicUpdateGraph}
- */
- public AwareFunctionalLock sharedLock() {
- return lock.sharedLock();
- }
-
- /**
- *
- * Get the exclusive lock for this {@link PeriodicUpdateGraph}.
- *
- * Using this lock will prevent run or read-only processing from proceeding concurrently.
- *
- * The exclusive lock implementation is expected to support reentrance.
- *
- * Note that using the exclusive lock while the shared lock is held by the current thread will result in exceptions,
- * as lock upgrade is not supported.
- *
- * This lock does support {@link java.util.concurrent.locks.Lock#newCondition()}.
- *
- * @return The exclusive lock for this {@link PeriodicUpdateGraph}
- */
- public AwareFunctionalLock exclusiveLock() {
- return lock.exclusiveLock();
- }
-
- // endregion Accessors for the shared and exclusive locks
-
- /**
- * Test if this thread is part of our run thread executor service.
- *
- * @return whether this is one of our run threads.
- */
- @Override
- public boolean currentThreadProcessesUpdates() {
- return isUpdateThread.get();
- }
-
- @Override
- public boolean serialTableOperationsSafe() {
- return serialTableOperationsSafe.get();
- }
-
- @Override
- public boolean setSerialTableOperationsSafe(final boolean newValue) {
- final boolean old = serialTableOperationsSafe.get();
- serialTableOperationsSafe.set(newValue);
- return old;
- }
-
/**
* Set the target duration of an update cycle, including the updating phase and the idle phase. This is also the
* target interval between the start of one cycle and the start of the next.
@@ -532,6 +249,11 @@ public long getTargetCycleDurationMillis() {
return targetCycleDurationMillis;
}
+ @Override
+ public boolean isCycleOnBudget(long cycleTimeNanos) {
+ return cycleTimeNanos <= MILLISECONDS.toNanos(targetCycleDurationMillis);
+ }
+
/**
* Resets the run cycle time to the default target configured via the {@link Builder} setting.
*
@@ -563,7 +285,7 @@ public void enableUnitTestMode() {
if (refreshThread.isAlive()) {
throw new IllegalStateException("PeriodicUpdateGraph.refreshThread is executing!");
}
- lock.reset();
+ resetLock();
unitTestMode = true;
unitTestRefreshThreadPool = makeUnitTestRefreshExecutor();
updatePerformanceTracker.enableUnitTestMode();
@@ -631,58 +353,32 @@ public void start() {
* Begins the process to stop all processing threads and forces ReferenceCounted sources to a reference count of
* zero.
*/
+ @Override
public void stop() {
running = false;
notificationProcessor.shutdown();
+ // ensure that any outstanding cycle has completed
+ exclusiveLock().doLocked(() -> {
+ });
}
/**
- * Add a table to the list of tables to run and mark it as {@link DynamicNode#setRefreshing(boolean) refreshing} if
- * it was a {@link DynamicNode}.
+ * {@inheritDoc}
*
* @implNote This will do nothing in {@link #enableUnitTestMode() unit test} mode other than mark the table as
* refreshing.
- * @param updateSource The table to be added to the run list
*/
@Override
- public void addSource(@NotNull final Runnable updateSource) {
- if (!running) {
- throw new IllegalStateException("PeriodicUpdateGraph is no longer running");
- }
-
- if (updateSource instanceof DynamicNode) {
- ((DynamicNode) updateSource).setRefreshing(true);
- }
-
- if (!allowUnitTestMode) {
+ public void addSource(@NotNull Runnable updateSource) {
+ if (allowUnitTestMode) {
// if we are in unit test mode we never want to start the UpdateGraph
- sources.add(updateSource);
- start();
+ if (updateSource instanceof DynamicNode) {
+ ((DynamicNode) updateSource).setRefreshing(true);
+ }
+ return;
}
- }
-
- @Override
- public void removeSource(@NotNull final Runnable updateSource) {
- sources.remove(updateSource);
- }
-
- /**
- * Remove a collection of sources from the list of refreshing sources.
- *
- * @implNote This will not set the sources as {@link DynamicNode#setRefreshing(boolean) non-refreshing}.
- * @param sourcesToRemove The sources to remove from the list of refreshing sources
- */
- public void removeSources(final Collection sourcesToRemove) {
- sources.removeAll(sourcesToRemove);
- }
-
- /**
- * Return the number of valid sources.
- *
- * @return the number of valid sources
- */
- public int sourceCount() {
- return sources.size();
+ super.addSource(updateSource);
+ start();
}
/**
@@ -699,20 +395,7 @@ public void addNotification(@NotNull final Notification notification) {
if (notificationAdditionDelay > 0) {
SleepUtil.sleep(notificationRandomizer.nextInt(notificationAdditionDelay));
}
- if (notification.isTerminal()) {
- synchronized (terminalNotifications) {
- terminalNotifications.offer(notification);
- }
- } else {
- logDependencies().append(Thread.currentThread().getName()).append(": Adding notification ")
- .append(notification).endl();
- synchronized (pendingNormalNotifications) {
- Assert.eq(logicalClock.currentState(), "logicalClock.currentState()",
- LogicalClock.State.Updating, "LogicalClock.State.Updating");
- pendingNormalNotifications.offer(notification);
- }
- notificationProcessor.onNotificationAdded();
- }
+ super.addNotification(notification);
}
@Override
@@ -720,50 +403,7 @@ public boolean maybeAddNotification(@NotNull final Notification notification, fi
if (notificationAdditionDelay > 0) {
SleepUtil.sleep(notificationRandomizer.nextInt(notificationAdditionDelay));
}
- if (notification.isTerminal()) {
- throw new IllegalArgumentException("Notification must not be terminal");
- }
- logDependencies().append(Thread.currentThread().getName()).append(": Adding notification ").append(notification)
- .append(" if step is ").append(deliveryStep).endl();
- final boolean added;
- synchronized (pendingNormalNotifications) {
- // Note that the clock is advanced to idle under the pendingNormalNotifications lock, after which point no
- // further normal notifications will be processed on this cycle.
- final long logicalClockValue = logicalClock.currentValue();
- if (LogicalClock.getState(logicalClockValue) == LogicalClock.State.Updating
- && LogicalClock.getStep(logicalClockValue) == deliveryStep) {
- pendingNormalNotifications.offer(notification);
- added = true;
- } else {
- added = false;
- }
- }
- if (added) {
- notificationProcessor.onNotificationAdded();
- }
- return added;
- }
-
- @Override
- public boolean satisfied(final long step) {
- StepUpdater.checkForOlderStep(step, sourcesLastSatisfiedStep);
- return sourcesLastSatisfiedStep == step;
- }
-
- /**
- * Enqueue a collection of notifications to be flushed.
- *
- * @param notifications The notification to enqueue
- *
- * @see #addNotification(Notification)
- */
- @Override
- public void addNotifications(@NotNull final Collection extends Notification> notifications) {
- synchronized (pendingNormalNotifications) {
- synchronized (terminalNotifications) {
- notifications.forEach(this::addNotification);
- }
- }
+ return super.maybeAddNotification(notification, deliveryStep);
}
/**
@@ -772,20 +412,15 @@ public void addNotifications(@NotNull final Collection extends Notification> n
*/
@Override
public void requestRefresh() {
+ if (!running) {
+ throw new IllegalStateException("Cannot request refresh when UpdateGraph is no longer running.");
+ }
refreshRequested.set(true);
synchronized (refreshRequested) {
refreshRequested.notify();
}
}
- /**
- * @return Whether this UpdateGraph has a mechanism that supports refreshing
- */
- @Override
- public boolean supportsRefreshing() {
- return true;
- }
-
/**
* Clear all monitored tables and enqueued notifications to support {@link #enableUnitTestMode() unit-tests}.
*
@@ -808,6 +443,7 @@ public void resetForUnitTests(final boolean after) {
* @param notificationStartDelay Maximum randomized notification start delay
* @param notificationAdditionDelay Maximum randomized notification addition delay
*/
+ @TestUseOnly
public void resetForUnitTests(boolean after,
final boolean randomizedNotifications, final int seed, final int maxRandomizedThreadCount,
final int notificationStartDelay, final int notificationAdditionDelay) {
@@ -815,34 +451,15 @@ public void resetForUnitTests(boolean after,
this.notificationRandomizer = new Random(seed);
this.notificationAdditionDelay = notificationAdditionDelay;
Assert.assertion(unitTestMode, "unitTestMode");
- sources.clear();
- notificationProcessor.shutdown();
- synchronized (pendingNormalNotifications) {
- pendingNormalNotifications.clear();
- }
- isUpdateThread.remove();
+
+ resetForUnitTests(after, errors);
+
if (randomizedNotifications) {
notificationProcessor = makeRandomizedNotificationProcessor(notificationRandomizer,
maxRandomizedThreadCount, notificationStartDelay);
} else {
notificationProcessor = makeNotificationProcessor();
}
- synchronized (terminalNotifications) {
- terminalNotifications.clear();
- }
- logicalClock.resetForUnitTests();
- sourcesLastSatisfiedStep = logicalClock.currentStep();
-
- refreshScope = null;
- if (after) {
- LivenessManager stackTop;
- while ((stackTop = LivenessScopeStack.peek()) instanceof LivenessScope) {
- LivenessScopeStack.pop((LivenessScope) stackTop);
- }
- CleanupReferenceProcessorInstance.resetAllForUnitTests();
- }
-
- ensureUnlocked("unit test reset thread", errors);
if (refreshThread.isAlive()) {
errors.add("UpdateGraph refreshThread isAlive");
@@ -872,7 +489,7 @@ public void resetForUnitTests(boolean after,
}
}
- lock.reset();
+ resetLock();
}
/**
@@ -924,10 +541,7 @@ private void startCycleForUnitTestsInternal(final boolean sourcesSatisfied) {
@TestUseOnly
public void markSourcesRefreshedForUnitTests() {
Assert.assertion(unitTestMode, "unitTestMode");
- if (sourcesLastSatisfiedStep >= logicalClock.currentStep()) {
- throw new IllegalStateException("Already marked sources as satisfied!");
- }
- sourcesLastSatisfiedStep = logicalClock.currentStep();
+ updateSourcesLastSatisfiedStep(true);
}
/**
@@ -950,8 +564,9 @@ public void completeCycleForUnitTests() {
private void completeCycleForUnitTests(boolean errorCaughtAndInFinallyBlock) {
Assert.assertion(unitTestMode, "unitTestMode");
if (!errorCaughtAndInFinallyBlock) {
- Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
- "logicalClock.currentStep()");
+ final long currentStep = logicalClock.currentStep();
+ final boolean satisfied = satisfied(currentStep);
+ Assert.assertion(satisfied, "satisfied()", currentStep, "currentStep");
}
try {
unitTestRefreshThreadPool.submit(this::completeCycleForUnitTestsInternal).get();
@@ -974,7 +589,7 @@ private void completeCycleForUnitTestsInternal() {
exclusiveLock().unlock();
isUpdateThread.remove();
}) {
- flushNotificationsAndCompleteCycle();
+ flushNotificationsAndCompleteCycle(false);
}
}
@@ -1157,7 +772,7 @@ public Runnable flushAllNormalNotificationsForUnitTests(@NotNull final BooleanSu
}
/**
- * If the run thread is waiting in {@link #flushNormalNotificationsAndCompleteCycle()} or
+ * If the run thread is waiting in flushNormalNotificationsAndCompleteCycle() or
* {@link #flushAllNormalNotificationsForUnitTests(BooleanSupplier, long)}, wake it up.
*/
@TestUseOnly
@@ -1166,210 +781,6 @@ public void wakeRefreshThreadForUnitTests() {
notificationProcessor.onNotificationAdded();
}
- /**
- * Flush all non-terminal notifications, complete the logical clock update cycle, then flush all terminal
- * notifications.
- */
- private void flushNotificationsAndCompleteCycle() {
- // We cannot proceed with normal notifications, nor are we satisfied, until all update source refresh
- // notifications have been processed. Note that non-update source notifications that require dependency
- // satisfaction are delivered first to the pendingNormalNotifications queue, and hence will not be processed
- // until we advance to the flush* methods.
- // TODO: If and when we properly integrate update sources into the dependency tracking system, we can
- // discontinue this distinct phase, along with the requirement to treat the UpdateGraph itself as a Dependency.
- // Until then, we must delay the beginning of "normal" notification processing until all update sources are
- // done. See IDS-8039.
- notificationProcessor.doAllWork();
- sourcesLastSatisfiedStep = logicalClock.currentStep();
-
- flushNormalNotificationsAndCompleteCycle();
- flushTerminalNotifications();
- synchronized (pendingNormalNotifications) {
- Assert.assertion(pendingNormalNotifications.isEmpty(), "pendingNormalNotifications.isEmpty()");
- }
- }
-
- /**
- * Flush all non-terminal {@link Notification notifications} from the queue.
- */
- private void flushNormalNotificationsAndCompleteCycle() {
- final IntrusiveDoublyLinkedQueue pendingToEvaluate =
- new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
- while (true) {
- final int outstandingCountAtStart = notificationProcessor.outstandingNotificationsCount();
- notificationProcessor.beforeNotificationsDrained();
- synchronized (pendingNormalNotifications) {
- pendingToEvaluate.transferAfterTailFrom(pendingNormalNotifications);
- if (outstandingCountAtStart == 0 && pendingToEvaluate.isEmpty()) {
- // We complete the cycle here before releasing the lock on pendingNotifications, so that
- // maybeAddNotification can detect scenarios where the notification cannot be delivered on the
- // desired step.
- logicalClock.completeUpdateCycle();
- break;
- }
- }
- logDependencies().append(Thread.currentThread().getName())
- .append(": Notification queue size=").append(pendingToEvaluate.size())
- .append(", outstanding=").append(outstandingCountAtStart)
- .endl();
-
- boolean nothingBecameSatisfied = true;
- for (final Iterator it = pendingToEvaluate.iterator(); it.hasNext();) {
- final Notification notification = it.next();
-
- Assert.eqFalse(notification.isTerminal(), "notification.isTerminal()");
- Assert.eqFalse(notification.mustExecuteWithUpdateGraphLock(),
- "notification.mustExecuteWithUpdateGraphLock()");
-
- final boolean satisfied = notification.canExecute(sourcesLastSatisfiedStep);
- if (satisfied) {
- nothingBecameSatisfied = false;
- it.remove();
- logDependencies().append(Thread.currentThread().getName())
- .append(": Submitting to notification processor ").append(notification).endl();
- notificationProcessor.submit(notification);
- } else {
- logDependencies().append(Thread.currentThread().getName()).append(": Unmet dependencies for ")
- .append(notification).endl();
- }
- }
- if (outstandingCountAtStart == 0 && nothingBecameSatisfied) {
- throw new IllegalStateException(
- "No outstanding notifications, yet the notification queue is not empty!");
- }
- if (notificationProcessor.outstandingNotificationsCount() > 0) {
- notificationProcessor.doWork();
- }
- }
- synchronized (pendingNormalNotifications) {
- Assert.eqZero(pendingNormalNotifications.size() + pendingToEvaluate.size(),
- "pendingNormalNotifications.size() + pendingToEvaluate.size()");
- }
- }
-
- /**
- * Flush all {@link Notification#isTerminal() terminal} {@link Notification notifications} from the queue.
- *
- * @implNote Any notification that may have been queued while the clock's state is Updating must be invoked during
- * this cycle's Idle phase.
- */
- private void flushTerminalNotifications() {
- synchronized (terminalNotifications) {
- for (final Iterator it = terminalNotifications.iterator(); it.hasNext();) {
- final Notification notification = it.next();
- Assert.assertion(notification.isTerminal(), "notification.isTerminal()");
-
- if (!notification.mustExecuteWithUpdateGraphLock()) {
- it.remove();
- // for the single threaded queue case; this enqueues the notification;
- // for the executor service case, this causes the notification to be kicked off
- notificationProcessor.submit(notification);
- }
- }
- }
-
- // run the notifications that must be run on this thread
- while (true) {
- final Notification notificationForThisThread;
- synchronized (terminalNotifications) {
- notificationForThisThread = terminalNotifications.poll();
- }
- if (notificationForThisThread == null) {
- break;
- }
- runNotification(notificationForThisThread);
- }
-
- // We can not proceed until all of the terminal notifications have executed.
- notificationProcessor.doAllWork();
- }
-
- /**
- * Abstract away the details of satisfied notification processing.
- */
- private interface NotificationProcessor {
-
- /**
- * Submit a satisfied notification for processing.
- *
- * @param notification The notification
- */
- void submit(@NotNull NotificationQueue.Notification notification);
-
- /**
- * Submit a queue of satisfied notification for processing.
- *
- * @param notifications The queue of notifications to
- * {@link IntrusiveDoublyLinkedQueue#transferAfterTailFrom(IntrusiveDoublyLinkedQueue) transfer} from.
- * Will become empty as a result of successful completion
- */
- void submitAll(@NotNull IntrusiveDoublyLinkedQueue notifications);
-
- /**
- * Query the number of outstanding notifications submitted to this processor.
- *
- * @return The number of outstanding notifications
- */
- int outstandingNotificationsCount();
-
- /**
- *
- * Do work (or in the multi-threaded case, wait for some work to have happened).
- *
- * Caller must know that work is outstanding.
- */
- void doWork();
-
- /**
- * Do all outstanding work.
- */
- void doAllWork();
-
- /**
- * Shutdown this notification processor (for unit tests).
- */
- void shutdown();
-
- /**
- * Called after a pending notification is added.
- */
- void onNotificationAdded();
-
- /**
- * Called before pending notifications are drained.
- */
- void beforeNotificationsDrained();
- }
-
- private void runNotification(@NotNull final Notification notification) {
- logDependencies().append(Thread.currentThread().getName()).append(": Executing ").append(notification).endl();
-
- final LivenessScope scope;
- final boolean releaseScopeOnClose;
- if (notification.isTerminal()) {
- // Terminal notifications can't create new notifications, so they have no need to participate in a shared
- // run scope.
- scope = new LivenessScope();
- releaseScopeOnClose = true;
- } else {
- // Non-terminal notifications must use a shared run scope.
- Assert.neqNull(refreshScope, "refreshScope");
- scope = refreshScope == LivenessScopeStack.peek() ? null : refreshScope;
- releaseScopeOnClose = false;
- }
-
- try (final SafeCloseable ignored = scope == null ? null : LivenessScopeStack.open(scope, releaseScopeOnClose)) {
- notification.run();
- logDependencies().append(Thread.currentThread().getName()).append(": Completed ").append(notification)
- .endl();
- } catch (final Exception e) {
- log.error().append(Thread.currentThread().getName())
- .append(": Exception while executing PeriodicUpdateGraph notification: ").append(notification)
- .append(": ").append(e).endl();
- ProcessEnvironment.getGlobalFatalErrorReporter()
- .report("Exception while processing PeriodicUpdateGraph notification", e);
- }
- }
private class ConcurrentNotificationProcessor implements NotificationProcessor {
@@ -1511,100 +922,6 @@ int threadCount() {
}
}
- private static final class PoisonedNotificationProcessor implements NotificationProcessor {
-
- private static final NotificationProcessor INSTANCE = new PoisonedNotificationProcessor();
-
- private static RuntimeException notYetStarted() {
- return new IllegalStateException("PeriodicUpdateGraph has not been started yet");
- }
-
- private PoisonedNotificationProcessor() {}
-
- @Override
- public void submit(@NotNull Notification notification) {
- throw notYetStarted();
- }
-
- @Override
- public void submitAll(@NotNull IntrusiveDoublyLinkedQueue notifications) {
- throw notYetStarted();
- }
-
- @Override
- public int outstandingNotificationsCount() {
- throw notYetStarted();
- }
-
- @Override
- public void doWork() {
- throw notYetStarted();
- }
-
- @Override
- public void doAllWork() {
- throw notYetStarted();
- }
-
- @Override
- public void shutdown() {}
-
- @Override
- public void onNotificationAdded() {
- throw notYetStarted();
- }
-
- @Override
- public void beforeNotificationsDrained() {
- throw notYetStarted();
- }
- }
-
- private class QueueNotificationProcessor implements NotificationProcessor {
-
- final IntrusiveDoublyLinkedQueue satisfiedNotifications =
- new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
-
- @Override
- public void submit(@NotNull final Notification notification) {
- satisfiedNotifications.offer(notification);
- }
-
- @Override
- public void submitAll(@NotNull IntrusiveDoublyLinkedQueue notifications) {
- satisfiedNotifications.transferAfterTailFrom(notifications);
- }
-
- @Override
- public int outstandingNotificationsCount() {
- return satisfiedNotifications.size();
- }
-
- @Override
- public void doWork() {
- Notification satisfiedNotification;
- while ((satisfiedNotification = satisfiedNotifications.poll()) != null) {
- runNotification(satisfiedNotification);
- }
- }
-
- @Override
- public void doAllWork() {
- doWork();
- }
-
- @Override
- public void shutdown() {
- satisfiedNotifications.clear();
- }
-
- @Override
- public void onNotificationAdded() {}
-
- @Override
- public void beforeNotificationsDrained() {}
- }
-
@TestUseOnly
private class ControlledNotificationProcessor implements NotificationProcessor {
@@ -1667,44 +984,32 @@ private boolean blockUntilNotificationAdded(final long nanosToWait) {
}
}
- private static LogEntry appendAsMillisFromNanos(final LogEntry entry, final long nanos) {
- if (nanos > 0) {
- return entry.appendDouble(nanos / 1_000_000.0, 3);
- }
- return entry.append(0);
- }
/**
- * Iterate over all monitored tables and run them. This method also ensures that the loop runs no faster than
- * {@link #getTargetCycleDurationMillis() minimum cycle time}.
+ * Iterate over all monitored tables and run them.
+ *
+ *
+ * This method also ensures that the loop runs no faster than {@link #getTargetCycleDurationMillis() minimum cycle
+ * time}.
+ *
*/
- private void refreshTablesAndFlushNotifications() {
+ @Override
+ void refreshTablesAndFlushNotifications() {
final long startTimeNanos = System.nanoTime();
- jvmIntrospectionContext.startSample();
- if (sources.isEmpty()) {
- exclusiveLock().doLocked(this::flushTerminalNotifications);
- } else {
- currentCycleLockWaitTotalNanos = currentCycleYieldTotalNanos = currentCycleSleepTotalNanos = 0L;
-
- ScheduledFuture> watchdogFuture = null;
-
- final long localWatchdogMillis = watchDogMillis;
- final LongConsumer localWatchdogTimeoutProcedure = watchDogTimeoutProcedure;
- if ((localWatchdogMillis > 0) && (localWatchdogTimeoutProcedure != null)) {
- watchdogFuture = watchdogScheduler.schedule(
- () -> localWatchdogTimeoutProcedure.accept(localWatchdogMillis),
- localWatchdogMillis, MILLISECONDS);
- }
+ ScheduledFuture> watchdogFuture = null;
+ final long localWatchdogMillis = watchDogMillis;
+ final LongConsumer localWatchdogTimeoutProcedure = watchDogTimeoutProcedure;
+ if ((localWatchdogMillis > 0) && (localWatchdogTimeoutProcedure != null)) {
+ watchdogFuture = watchdogScheduler.schedule(
+ () -> localWatchdogTimeoutProcedure.accept(localWatchdogMillis),
+ localWatchdogMillis, MILLISECONDS);
+ }
- refreshAllTables();
+ super.refreshTablesAndFlushNotifications();
- if (watchdogFuture != null) {
- watchdogFuture.cancel(true);
- }
- jvmIntrospectionContext.endSample();
- final long cycleTimeNanos = System.nanoTime() - startTimeNanos;
- computeStatsAndLogCycle(cycleTimeNanos);
+ if (watchdogFuture != null) {
+ watchdogFuture.cancel(true);
}
if (interCycleYield) {
@@ -1714,72 +1019,6 @@ private void refreshTablesAndFlushNotifications() {
waitForNextCycle(startTimeNanos);
}
- private void computeStatsAndLogCycle(final long cycleTimeNanos) {
- final long safePointPauseTimeMillis = jvmIntrospectionContext.deltaSafePointPausesTimeMillis();
- accumulatedCycleStats.accumulate(
- getTargetCycleDurationMillis(),
- cycleTimeNanos,
- jvmIntrospectionContext.deltaSafePointPausesCount(),
- safePointPauseTimeMillis);
- if (cycleTimeNanos >= minimumCycleDurationToLogNanos) {
- if (suppressedCycles > 0) {
- logSuppressedCycles();
- }
- final double cycleTimeMillis = cycleTimeNanos / 1_000_000.0;
- LogEntry entry = log.info()
- .append("Update Graph Processor cycleTime=").appendDouble(cycleTimeMillis, 3);
- if (jvmIntrospectionContext.hasSafePointData()) {
- final long safePointSyncTimeMillis = jvmIntrospectionContext.deltaSafePointSyncTimeMillis();
- entry = entry
- .append("ms, safePointTime=")
- .append(safePointPauseTimeMillis)
- .append("ms, safePointTimePct=");
- if (safePointPauseTimeMillis > 0 && cycleTimeMillis > 0.0) {
- final double safePointTimePct = 100.0 * safePointPauseTimeMillis / cycleTimeMillis;
- entry = entry.appendDouble(safePointTimePct, 2);
- } else {
- entry = entry.append("0");
- }
- entry = entry.append("%, safePointSyncTime=").append(safePointSyncTimeMillis);
- }
- entry = entry.append("ms, lockWaitTime=");
- entry = appendAsMillisFromNanos(entry, currentCycleLockWaitTotalNanos);
- entry = entry.append("ms, yieldTime=");
- entry = appendAsMillisFromNanos(entry, currentCycleSleepTotalNanos);
- entry = entry.append("ms, sleepTime=");
- entry = appendAsMillisFromNanos(entry, currentCycleSleepTotalNanos);
- entry.append("ms").endl();
- return;
- }
- if (cycleTimeNanos > 0) {
- ++suppressedCycles;
- suppressedCyclesTotalNanos += cycleTimeNanos;
- suppressedCyclesTotalSafePointTimeMillis += safePointPauseTimeMillis;
- if (suppressedCyclesTotalNanos >= minimumCycleDurationToLogNanos) {
- logSuppressedCycles();
- }
- }
- }
-
- private void logSuppressedCycles() {
- LogEntry entry = log.info()
- .append("Minimal Update Graph Processor cycle times: ")
- .appendDouble((double) (suppressedCyclesTotalNanos) / 1_000_000.0, 3).append("ms / ")
- .append(suppressedCycles).append(" cycles = ")
- .appendDouble(
- (double) suppressedCyclesTotalNanos / (double) suppressedCycles / 1_000_000.0, 3)
- .append("ms/cycle average)");
- if (jvmIntrospectionContext.hasSafePointData()) {
- entry = entry
- .append(", safePointTime=")
- .append(suppressedCyclesTotalSafePointTimeMillis)
- .append("ms");
- }
- entry.endl();
- suppressedCycles = suppressedCyclesTotalNanos = 0;
- suppressedCyclesTotalSafePointTimeMillis = 0;
- }
-
/**
*
* Ensure that at least {@link #getTargetCycleDurationMillis() minCycleTime} has passed before returning.
@@ -1804,15 +1043,7 @@ private void waitForNextCycle(final long startTimeNanos) {
expectedEndTimeNanos =
Math.max(expectedEndTimeNanos, nowNanos + MILLISECONDS.toNanos(minimumInterCycleSleep));
}
- if (expectedEndTimeNanos >= nextUpdatePerformanceTrackerFlushTimeNanos) {
- nextUpdatePerformanceTrackerFlushTimeNanos =
- nowNanos + MILLISECONDS.toNanos(UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS);
- try {
- updatePerformanceTracker.flush();
- } catch (Exception err) {
- log.error().append("Error flushing UpdatePerformanceTracker: ").append(err).endl();
- }
- }
+ maybeFlushUpdatePerformance(nowNanos, expectedEndTimeNanos);
waitForEndTime(expectedEndTimeNanos);
}
@@ -1848,98 +1079,10 @@ private void waitForEndTime(final long expectedEndTimeNanos) {
}
}
- /**
- * Refresh all the update sources within an {@link LogicalClock update cycle} after the UpdateGraph has been locked.
- * At the end of the updates all {@link Notification notifications} will be flushed.
- */
- private void refreshAllTables() {
+ @Override
+ void refreshAllTables() {
refreshRequested.set(false);
- doRefresh(() -> sources.forEach((final UpdateSourceRefreshNotification updateSourceNotification,
- final Runnable unused) -> notificationProcessor.submit(updateSourceNotification)));
- }
-
- /**
- * Perform a run cycle, using {@code refreshFunction} to ensure the desired update sources are refreshed at the
- * start.
- *
- * @param refreshFunction Function to submit one or more {@link UpdateSourceRefreshNotification update source
- * refresh notifications} to the {@link NotificationProcessor notification processor} or run them directly.
- */
- private void doRefresh(@NotNull final Runnable refreshFunction) {
- final long lockStartTimeNanos = System.nanoTime();
- exclusiveLock().doLocked(() -> {
- currentCycleLockWaitTotalNanos += System.nanoTime() - lockStartTimeNanos;
- synchronized (pendingNormalNotifications) {
- Assert.eqZero(pendingNormalNotifications.size(), "pendingNormalNotifications.size()");
- }
- Assert.eqNull(refreshScope, "refreshScope");
- refreshScope = new LivenessScope();
- final long updatingCycleValue = logicalClock.startUpdateCycle();
- logDependencies().append("Beginning PeriodicUpdateGraph cycle step=")
- .append(logicalClock.currentStep()).endl();
- try (final SafeCloseable ignored = LivenessScopeStack.open(refreshScope, true)) {
- refreshFunction.run();
- flushNotificationsAndCompleteCycle();
- } finally {
- logicalClock.ensureUpdateCycleCompleted(updatingCycleValue);
- refreshScope = null;
- }
- logDependencies().append("Completed PeriodicUpdateGraph cycle step=")
- .append(logicalClock.currentStep()).endl();
- });
- }
-
- /**
- * Re-usable class for adapting update sources to {@link Notification}s.
- */
- private static final class UpdateSourceRefreshNotification extends AbstractNotification
- implements SimpleReference {
-
- private final WeakReference updateSourceRef;
-
- private UpdateSourceRefreshNotification(@NotNull final Runnable updateSource) {
- super(false);
- updateSourceRef = new WeakReference<>(updateSource);
- }
-
- @Override
- public LogOutput append(@NotNull final LogOutput logOutput) {
- return logOutput.append("UpdateSourceRefreshNotification{").append(System.identityHashCode(this))
- .append(", for UpdateSource{").append(System.identityHashCode(get())).append("}}");
- }
-
- @Override
- public boolean canExecute(final long step) {
- return true;
- }
-
- @Override
- public void run() {
- final Runnable updateSource = updateSourceRef.get();
- if (updateSource == null) {
- return;
- }
- updateSource.run();
- }
-
- @Override
- public Runnable get() {
- // NB: Arguably we should make get() and clear() synchronized.
- return updateSourceRef.get();
- }
-
- @Override
- public void clear() {
- updateSourceRef.clear();
- }
- }
-
- public LogEntry logDependencies() {
- if (printDependencyInformation) {
- return log.info();
- } else {
- return LogEntry.NULL;
- }
+ super.refreshAllTables();
}
private class NotificationProcessorThreadFactory extends NamingThreadFactory {
@@ -1956,26 +1099,6 @@ public Thread newThread(@NotNull final Runnable r) {
}
}
- @TestUseOnly
- private void ensureUnlocked(@NotNull final String callerDescription, @Nullable final List errors) {
- if (exclusiveLock().isHeldByCurrentThread()) {
- if (errors != null) {
- errors.add(callerDescription + ": UpdateGraph exclusive lock is still held");
- }
- while (exclusiveLock().isHeldByCurrentThread()) {
- exclusiveLock().unlock();
- }
- }
- if (sharedLock().isHeldByCurrentThread()) {
- if (errors != null) {
- errors.add(callerDescription + ": UpdateGraph shared lock is still held");
- }
- while (sharedLock().isHeldByCurrentThread()) {
- sharedLock().unlock();
- }
- }
- }
-
private ExecutorService makeUnitTestRefreshExecutor() {
return Executors.newFixedThreadPool(1, new UnitTestThreadFactory());
}
@@ -2024,12 +1147,8 @@ private void configureUnitTestRefreshThread() {
ExecutionContext.newBuilder().setUpdateGraph(this).build().open();
}
- public void takeAccumulatedCycleStats(AccumulatedCycleStats updateGraphAccumCycleStats) {
- accumulatedCycleStats.take(updateGraphAccumCycleStats);
- }
-
public static PeriodicUpdateGraph getInstance(final String name) {
- return INSTANCES.get(name);
+ return BaseUpdateGraph.getInstance(name).cast();
}
public static final class Builder {
@@ -2037,8 +1156,7 @@ public static final class Builder {
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
- private long minimumCycleDurationToLogNanos = MILLISECONDS.toNanos(
- Configuration.getInstance().getIntegerWithDefault(MINIMUM_CYCLE_DURATION_TO_LOG_MILLIS_PROP, 25));
+ private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;
private String name;
private int numUpdateThreads = -1;
@@ -2089,18 +1207,10 @@ public Builder numUpdateThreads(int numUpdateThreads) {
* name provided to this builder.
*
* @return the new PeriodicUpdateGraph
- * @throws IllegalStateException if a PeriodicUpdateGraph with the provided name already exists
+ * @throws IllegalStateException if an UpdateGraph with the provided name already exists
*/
public PeriodicUpdateGraph build() {
- synchronized (INSTANCES) {
- if (INSTANCES.containsKey(name)) {
- throw new IllegalStateException(
- String.format("PeriodicUpdateGraph with name %s already exists", name));
- }
- final PeriodicUpdateGraph newUpdateGraph = construct();
- INSTANCES.put(name, newUpdateGraph);
- return newUpdateGraph;
- }
+ return BaseUpdateGraph.buildOrThrow(name, this::construct);
}
/**
@@ -2108,9 +1218,10 @@ public PeriodicUpdateGraph build() {
* new PeriodicUpdateGraph.
*
* @return the PeriodicUpdateGraph
+ * @throws ClassCastException if the existing graph is not a PeriodicUpdateGraph
*/
public PeriodicUpdateGraph existingOrBuild() {
- return INSTANCES.putIfAbsent(name, n -> construct());
+ return BaseUpdateGraph.existingOrBuild(name, this::construct).cast();
}
private PeriodicUpdateGraph construct() {
diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PoisonedNotificationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PoisonedNotificationProcessor.java
new file mode 100644
index 00000000000..e6590d9285b
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PoisonedNotificationProcessor.java
@@ -0,0 +1,58 @@
+package io.deephaven.engine.updategraph.impl;
+
+import io.deephaven.engine.updategraph.NotificationQueue;
+import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The poisoned notification processor is used when an update graph has not yet been started, throwing an
+ * IllegalStateException on all operations.
+ */
+final class PoisonedNotificationProcessor implements BaseUpdateGraph.NotificationProcessor {
+
+ static final BaseUpdateGraph.NotificationProcessor INSTANCE = new PoisonedNotificationProcessor();
+
+ private static RuntimeException notYetStarted() {
+ return new IllegalStateException("UpdateGraph has not been started yet");
+ }
+
+ private PoisonedNotificationProcessor() {}
+
+ @Override
+ public void submit(@NotNull NotificationQueue.Notification notification) {
+ throw notYetStarted();
+ }
+
+ @Override
+ public void submitAll(@NotNull IntrusiveDoublyLinkedQueue notifications) {
+ throw notYetStarted();
+ }
+
+ @Override
+ public int outstandingNotificationsCount() {
+ throw notYetStarted();
+ }
+
+ @Override
+ public void doWork() {
+ throw notYetStarted();
+ }
+
+ @Override
+ public void doAllWork() {
+ throw notYetStarted();
+ }
+
+ @Override
+ public void shutdown() {}
+
+ @Override
+ public void onNotificationAdded() {
+ throw notYetStarted();
+ }
+
+ @Override
+ public void beforeNotificationsDrained() {
+ throw notYetStarted();
+ }
+}
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java
index c70555e5a06..63e98610be9 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java
@@ -163,4 +163,9 @@ public void runWithinUnitTestCycle(
final boolean satisfied) throws T {
delegate.runWithinUnitTestCycle(runnable, satisfied);
}
+
+ @Override
+ public void stop() {
+ delegate.stop();
+ }
}
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java
index 5648009a55f..1a3fb62bf8b 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java
@@ -3644,5 +3644,8 @@ public void removeSource(@NotNull Runnable updateSource) {}
public void requestRefresh() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void stop() {}
}
}
diff --git a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java
new file mode 100644
index 00000000000..7ad90534b13
--- /dev/null
+++ b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java
@@ -0,0 +1,245 @@
+package io.deephaven.engine.updategraph.impl;
+
+import io.deephaven.api.agg.Aggregation;
+import io.deephaven.configuration.DataDir;
+import io.deephaven.engine.context.ExecutionContext;
+import io.deephaven.engine.context.QueryCompiler;
+import io.deephaven.engine.rowset.RowSet;
+import io.deephaven.engine.rowset.RowSetFactory;
+import io.deephaven.engine.rowset.TrackingRowSet;
+import io.deephaven.engine.table.ColumnSource;
+import io.deephaven.engine.table.Table;
+import io.deephaven.engine.table.impl.QueryTable;
+import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
+import io.deephaven.engine.table.impl.sources.LongSingleValueSource;
+import io.deephaven.engine.testutil.TstUtils;
+import io.deephaven.engine.updategraph.UpdateGraph;
+import io.deephaven.engine.util.TableTools;
+import io.deephaven.util.SafeCloseable;
+import io.deephaven.util.annotations.ReflexiveUse;
+import junit.framework.TestCase;
+import org.junit.*;
+
+import java.nio.file.Path;
+import java.util.Collections;
+
+import static io.deephaven.engine.util.TableTools.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestEventDrivenUpdateGraph {
+ EventDrivenUpdateGraph defaultUpdateGraph;
+
+ @Before
+ public void before() {
+ // the default update is necessary for the update performance tracker
+ clearUpdateGraphInstances();
+ UpdatePerformanceTracker.resetForUnitTests();
+ defaultUpdateGraph = EventDrivenUpdateGraph.newBuilder(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).build();
+ }
+
+ @After
+ public void after() {
+ clearUpdateGraphInstances();
+ UpdatePerformanceTracker.resetForUnitTests();
+ }
+
+ private static void clearUpdateGraphInstances() {
+ BaseUpdateGraph.removeInstance(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME);
+ BaseUpdateGraph.removeInstance("TestEDUG");
+ BaseUpdateGraph.removeInstance("TestEDUG1");
+ BaseUpdateGraph.removeInstance("TestEDUG2");
+ }
+
+ /**
+ * QueryTable that adds one row per cycle.
+ */
+ final static class SourceThatRefreshes extends QueryTable implements Runnable {
+ public SourceThatRefreshes(UpdateGraph updateGraph) {
+ super(RowSetFactory.empty().toTracking(), Collections.emptyMap());
+ setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE);
+ updateGraph.addSource(this);
+ }
+
+ @Override
+ public void run() {
+ final RowSet added;
+ if (getRowSet().isEmpty()) {
+ added = RowSetFactory.fromKeys(0);
+ } else {
+ added = RowSetFactory.fromKeys(getRowSet().lastRowKey() + 1);
+ }
+ getRowSet().writableCast().insert(added);
+ notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
+ }
+ }
+
+ /**
+ * QueryTable that modifies its single row on each cycle.
+ */
+ final static class SourceThatModifiesItself extends QueryTable implements Runnable {
+ final LongSingleValueSource svcs;
+
+ public SourceThatModifiesItself(UpdateGraph updateGraph) {
+ super(RowSetFactory.fromKeys(42).toTracking(), Collections.singletonMap("V", new LongSingleValueSource()));
+ svcs = (LongSingleValueSource) getColumnSource("V", long.class);
+ svcs.startTrackingPrevValues();
+ updateGraph.addSource(this);
+ svcs.set(0L);
+ }
+
+ @Override
+ public void run() {
+ svcs.set(svcs.getLong(0) + 1);
+ notifyListeners(RowSetFactory.empty(), RowSetFactory.empty(), getRowSet().copy());
+ }
+ }
+
+ private QueryCompiler compilerForUnitTests() {
+ final Path queryCompilerDir = DataDir.get()
+ .resolve("io.deephaven.engine.updategraph.impl.TestEventDrivenUpdateGraph.compilerForUnitTests");
+
+ return QueryCompiler.create(queryCompilerDir.toFile(), getClass().getClassLoader());
+ }
+
+ @Test
+ public void testSimpleAdd() {
+ final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build();
+
+ final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph)
+ .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build();
+ try (final SafeCloseable ignored = context.open()) {
+ final SourceThatRefreshes sourceThatRefreshes = new SourceThatRefreshes(eventDrivenUpdateGraph);
+ final Table updated =
+ eventDrivenUpdateGraph.sharedLock().computeLocked(() -> sourceThatRefreshes.update("X=i"));
+
+ int steps = 0;
+ do {
+ TestCase.assertEquals(steps, updated.size());
+ eventDrivenUpdateGraph.requestRefresh();
+ } while (steps++ < 100);
+ TestCase.assertEquals(steps, updated.size());
+ }
+ }
+
+ @Test
+ public void testSimpleModify() {
+ final EventDrivenUpdateGraph eventDrivenUpdateGraph = new EventDrivenUpdateGraph.Builder("TestEDUG").build();
+
+ final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph)
+ .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build();
+ try (final SafeCloseable ignored = context.open()) {
+ final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph);
+ final Table updated =
+ eventDrivenUpdateGraph.sharedLock().computeLocked(() -> modifySource.update("X=2 * V"));
+
+ final ColumnSource xcs = updated.getColumnSource("X");
+
+ int steps = 0;
+ do {
+ TestCase.assertEquals(1, updated.size());
+ eventDrivenUpdateGraph.requestRefresh();
+
+ TableTools.showWithRowSet(modifySource);
+
+ final TrackingRowSet rowSet = updated.getRowSet();
+ System.out.println("Step = " + steps);
+ final long xv = xcs.getLong(rowSet.firstRowKey());
+ TestCase.assertEquals(2L * (steps + 1), xv);
+ } while (steps++ < 100);
+ TestCase.assertEquals(1, updated.size());
+ }
+ }
+
+ @Test
+ public void testUpdatePerformanceTracker() {
+ final Table upt = UpdatePerformanceTracker.getQueryTable();
+
+
+ final EventDrivenUpdateGraph eventDrivenUpdateGraph1 = EventDrivenUpdateGraph.newBuilder("TestEDUG1").build();
+ final EventDrivenUpdateGraph eventDrivenUpdateGraph2 = EventDrivenUpdateGraph.newBuilder("TestEDUG2").build();
+
+ // first empty flush
+ eventDrivenUpdateGraph1.requestRefresh();
+ eventDrivenUpdateGraph2.requestRefresh();
+
+ final long start = System.currentTimeMillis();
+
+ final int count1 = 10;
+ final int count2 = 20;
+ final int time1 = 10;
+ final int time2 = 5;
+
+ // the work we care about
+ final Object ref1 = doWork(eventDrivenUpdateGraph1, time1, count1 - 1);
+ final Object ref2 = doWork(eventDrivenUpdateGraph2, time2, count2 - 1);
+
+ // force a flush
+ eventDrivenUpdateGraph1.resetNextFlushTime();
+ eventDrivenUpdateGraph2.resetNextFlushTime();
+ eventDrivenUpdateGraph1.requestRefresh();
+ eventDrivenUpdateGraph2.requestRefresh();
+
+ defaultUpdateGraph.requestRefresh();
+
+ final Table inRange;
+ final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(defaultUpdateGraph)
+ .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build();
+ try (final SafeCloseable ignored = context.open()) {
+ final Table uptAgged = upt.where("!isNull(EntryId)").aggBy(
+ Aggregation.AggSum("UsageNanos", "InvocationCount", "RowsModified"),
+ "UpdateGraph", "EntryId");
+ assertEquals(defaultUpdateGraph, uptAgged.getUpdateGraph());
+ inRange = defaultUpdateGraph.sharedLock().computeLocked(() -> uptAgged.update(
+ "EIUExpectedMillis = UpdateGraph==`TestEDUG1` ? " + time1 + " : " + time2,
+ "TotalExpectedTime=InvocationCount * EIUExpectedMillis * 1_000_000L",
+ "InRange=(UsageNanos > 0.9 * TotalExpectedTime) && (UsageNanos < 1.5 * TotalExpectedTime)"));
+ }
+ TableTools.show(inRange);
+
+ final Table compare =
+ inRange.dropColumns("EntryId", "UsageNanos", "EIUExpectedMillis", "TotalExpectedTime");
+ TableTools.show(compare);
+
+ final Table expect = TableTools.newTable(stringCol("UpdateGraph", "TestEDUG1", "TestEDUG2"),
+ longCol("InvocationCount", count1, count2),
+ longCol("RowsModified", count1, count2), booleanCol("InRange", true, true));
+ TstUtils.assertTableEquals(expect, compare);
+ }
+
+ @ReflexiveUse(referrers = "TestEventDrivenUpdateGraph")
+ static public T sleepValue(long duration, T retVal) {
+ final Object blech = new Object();
+ // noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (blech) {
+ try {
+ final long milliSeconds = duration / 1_000_000L;
+ final int nanos = (int) (duration % 1_000_000L);
+ blech.wait(milliSeconds, nanos);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ return retVal;
+ }
+
+ private Object doWork(final EventDrivenUpdateGraph eventDrivenUpdateGraph, final int durationMillis,
+ final int steps) {
+ final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph)
+ .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build();
+ try (final SafeCloseable ignored = context.open()) {
+ final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph);
+ final Table updated =
+ eventDrivenUpdateGraph.sharedLock().computeLocked(() -> modifySource.update("X="
+ + getClass().getName() + ".sleepValue(" + (1000L * 1000L * durationMillis) + ", 2 * V)"));
+
+ int step = 0;
+ do {
+ TestCase.assertEquals(1, updated.size());
+ eventDrivenUpdateGraph.requestRefresh();
+ } while (++step < steps);
+ TestCase.assertEquals(1, updated.size());
+
+ // so that we do not lose the reference
+ return updated;
+ }
+ }
+}
diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java
index e0227c50a8c..67cd6bf3add 100644
--- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java
+++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java
@@ -179,12 +179,18 @@ default void checkInitiateSerialTableOperation() {
return;
}
throw new IllegalStateException(String.format(
- "May not initiate serial table operations: exclusiveLockHeld=%s, sharedLockHeld=%s, currentThreadProcessesUpdates=%s",
+ "May not initiate serial table operations for update graph %s: exclusiveLockHeld=%s, sharedLockHeld=%s, currentThreadProcessesUpdates=%s",
+ getName(),
exclusiveLock().isHeldByCurrentThread(),
sharedLock().isHeldByCurrentThread(),
currentThreadProcessesUpdates()));
}
+ /**
+ * Attempt to stop this update graph, and cease processing further notifications.
+ */
+ void stop();
+
// endregion thread control
// region refresh control