Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pausing/resuming metrics poller #47

Merged
merged 1 commit into from
Dec 13, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@
*/
package com.netflix.hystrix.contrib.metrics.eventstream;

import static org.junit.Assert.*;

import java.io.StringWriter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.hystrix.HystrixCircuitBreaker;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
Expand All @@ -39,37 +46,86 @@

/**
* Polls Hystrix metrics and output JSON strings for each metric to a MetricsPollerListener.
* <p>
* Polling can be stopped/started. Use shutdown() to permanently shutdown the poller.
*/
public class HystrixMetricsPoller {

static final Logger logger = LoggerFactory.getLogger(HystrixMetricsPoller.class);
private final ScheduledExecutorService executor;
private final int delay;
private final AtomicBoolean running = new AtomicBoolean(true);

private final AtomicBoolean running = new AtomicBoolean(false);
private volatile ScheduledFuture<?> scheduledTask = null;

/**
* Allocate resources to begin polling.
* <p>
* Use <code>start</code> to begin polling.
* <p>
* Use <code>shutdown</code> to cleanup resources and stop polling.
* <p>
* Use <code>pause</code> to temporarily stop polling that can be restarted again with <code>start</code>.
*
* @param delay
*/
public HystrixMetricsPoller(int delay) {
executor = new ScheduledThreadPoolExecutor(1, new MetricsPollerThreadFactory());
this.delay = delay;
}

/**
* Start polling.
*/
public synchronized void start(MetricsAsJsonPollerListener listener) {
logger.info("Starting HystrixMetricsPoller");
executor.scheduleWithFixedDelay(new MetricsPoller(listener), 0, delay, TimeUnit.MILLISECONDS);
// use compareAndSet to make sure it starts only once and when not running
if (running.compareAndSet(false, true)) {
logger.info("Starting HystrixMetricsPoller");
scheduledTask = executor.scheduleWithFixedDelay(new MetricsPoller(listener), 0, delay, TimeUnit.MILLISECONDS);
}
}

public synchronized void stop() {
// use compareAndSet not for concurrency reasons (this is synchronized)
// just as a double-check so only one execution of this method will result in it doing the shutdown
/**
* Pause (stop) polling. Polling can be started again with <code>start</code> as long as <code>shutdown</code> is not called.
*/
public synchronized void pause() {
// use compareAndSet to make sure it stops only once and when running
if (running.compareAndSet(true, false)) {
logger.info("Stopping the Servo Metrics Poller");
executor.shutdownNow();
scheduledTask.cancel(true);
}
}

/**
* Stops polling and shuts down the ExecutorService.
* <p>
* This instance can no longer be used after calling shutdown.
*/
public synchronized void shutdown() {
pause();
executor.shutdown();
}

public boolean isRunning() {
return running.get();
}

/**
* Used to protect against leaking ExecutorServices and threads if this class is abandoned for GC without shutting down.
*/
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
protected void finalize() throws Throwable {
if (!executor.isShutdown()) {
logger.warn(HystrixMetricsPoller.class.getSimpleName() + " was not shutdown. Caught in Finalize Guardian and shutting down.");
try {
shutdown();
} catch (Exception e) {
logger.error("Failed to shutdown " + HystrixMetricsPoller.class.getSimpleName(), e);
}
}
};
};

public static interface MetricsAsJsonPollerListener {
public void handleJsonMetric(String json);
}
Expand Down Expand Up @@ -226,7 +282,7 @@ public void run() {
} catch (Exception e) {
logger.warn("Failed to output metrics as JSON", e);
// shutdown
stop();
pause();
return;
}
}
Expand All @@ -244,4 +300,79 @@ public Thread newThread(Runnable r) {
}
}

public static class UnitTest {

@Test
public void testStartStopStart() {
HystrixMetricsPoller poller = new HystrixMetricsPoller(100);
try {
final AtomicInteger metricsCount = new AtomicInteger();
HystrixCommand<Boolean> test = new HystrixCommand<Boolean>(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")) {

@Override
protected Boolean run() {
return true;
}

};
test.execute();

poller.start(new MetricsAsJsonPollerListener() {

@Override
public void handleJsonMetric(String json) {
System.out.println("Received: " + json);
metricsCount.incrementAndGet();
}
});

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

int v1 = metricsCount.get();

assertTrue(v1 > 0);

poller.pause();

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

int v2 = metricsCount.get();

// they should be the same since we were paused
assertTrue(v2 == v1);

poller.start(new MetricsAsJsonPollerListener() {

@Override
public void handleJsonMetric(String json) {
System.out.println("Received: " + json);
metricsCount.incrementAndGet();
}
});

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

int v3 = metricsCount.get();

// we should have more metrics again
assertTrue(v3 > v1);

} finally {
poller.shutdown();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void handleRequest(HttpServletRequest request, HttpServletResponse respo
Thread.sleep(delay);
}
} catch (Exception e) {
poller.stop();
poller.shutdown();
logger.error("Failed to write. Will stop polling.", e);
}
logger.debug("Stopping Turbine stream to connection");
Expand All @@ -137,7 +137,7 @@ private void handleRequest(HttpServletRequest request, HttpServletResponse respo
} finally {
concurrentConnections.decrementAndGet();
if (poller != null) {
poller.stop();
poller.shutdown();
}
}
}
Expand Down