Skip to content

Commit

Permalink
Replace Scheduler usage in StatsDriver with ScheduledExecutorService
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy committed Oct 31, 2023
1 parent 1d21a08 commit 2acec77
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions Stats/src/main/java/io/deephaven/stats/StatsDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@

import io.deephaven.base.clock.Clock;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.net.CommBase;
import io.deephaven.util.SafeCloseable;
import io.deephaven.base.stats.*;
import io.deephaven.base.text.TimestampBuffer;
import io.deephaven.configuration.Configuration;
import io.deephaven.io.log.*;
import io.deephaven.io.sched.TimedJob;
import io.deephaven.io.log.impl.LogEntryPoolImpl;
import io.deephaven.io.log.impl.LogSinkImpl;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.thread.NamingThreadFactory;

import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Drives the collection of statistics on a 1-second timer task.
*/
public class StatsDriver extends TimedJob {
public class StatsDriver {
public interface StatusAdapter {
void sendAlert(String alertText);

Expand Down Expand Up @@ -53,7 +56,6 @@ public boolean cmsAlertEnabled() {
public final static String header =
"Stat,IntervalName,NowSec,NowString,AppNowSec,AppNowString,TypeTag,Name,N,Sum,Last,Min,Max,Avg,Sum2,Stdev";

private long nextInvocation;
private long nextCpuUpdate;
private long nextMemUpdate;

Expand All @@ -72,6 +74,10 @@ public boolean cmsAlertEnabled() {
private final StatsIntradayLogger intraday;
private final Value clockValue;
private final ExecutionContext executionContext;
@ReferentialIntegrity
private final ScheduledExecutorService scheduler;
@ReferentialIntegrity
private final ScheduledFuture<?> updateJobFuture;

private final StatsMemoryCollector memStats;
private final StatsCPUCollector cpuStats;
Expand Down Expand Up @@ -155,9 +161,8 @@ public StatsDriver(Clock clock, StatsIntradayLogger intraday, boolean getFdStats

final long now = System.currentTimeMillis();
final long delay = STEP - (now % STEP);
nextInvocation = now + delay;
nextCpuUpdate = nextInvocation + CPU_INTERVAL;
nextMemUpdate = nextInvocation + MEM_INTERVAL;
nextCpuUpdate = now + delay + CPU_INTERVAL;
nextMemUpdate = now + delay + MEM_INTERVAL;

cpuStats = new StatsCPUCollector(CPU_INTERVAL, getFdStats);
memStats = new StatsMemoryCollector(MEM_INTERVAL, statusAdapter::sendAlert, statusAdapter::cmsAlertEnabled);
Expand All @@ -166,13 +171,18 @@ public StatsDriver(Clock clock, StatsIntradayLogger intraday, boolean getFdStats
}
executionContext = ExecutionContext.getContext();

// now that the StatsDriver is completely constructed, we can schedule the first iteration
// now that the StatsDriver is completely constructed, we can schedule the update job
if (Configuration.getInstance().getBoolean("statsdriver.enabled")) {
schedule();
scheduler = Executors.newSingleThreadScheduledExecutor(
new NamingThreadFactory(StatsDriver.class, "statsUpdater", true));
updateJobFuture = scheduler.scheduleAtFixedRate(this::update, delay, STEP, TimeUnit.MILLISECONDS);
} else {
scheduler = null;
updateJobFuture = null;
}
}

public void timedOut() {
private void update() {
long t0 = System.nanoTime();
long now = System.currentTimeMillis();
long appNow = clock == null ? now : clock.currentTimeMillis();
Expand Down Expand Up @@ -213,17 +223,9 @@ public void timedOut() {
}
}

schedule();

statsTiming.sample((System.nanoTime() - t0 + 500) / 1000);
}

private void schedule() {
CommBase.getScheduler().installJob(this, nextInvocation);
long steps = Math.max(1L, (((System.currentTimeMillis() - nextInvocation) / STEP) + 1));
nextInvocation += steps * STEP;
}

private final ItemUpdateListener LISTENER = new ItemUpdateListener() {
@Override
public void handleItemUpdated(Item<?> item, long now, long appNow, int intervalIndex, long intervalMillis,
Expand Down

0 comments on commit 2acec77

Please sign in to comment.