From 02eaeda56fbc654756b049ae7ef8e440dfe03bc2 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 12 Dec 2012 16:06:43 -0800 Subject: [PATCH] Support pausing/resuming metrics poller --- .../eventstream/HystrixMetricsPoller.java | 149 ++++++++++++++++-- .../HystrixMetricsStreamServlet.java | 4 +- 2 files changed, 142 insertions(+), 11 deletions(-) diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java index 8fa6ec007..d4f9d239f 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java @@ -15,20 +15,27 @@ */ package com.netflix.hystrix.contrib.metrics.eventstream; +import static org.junit.Assert.*; + import java.io.StringWriter; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.hystrix.HystrixCircuitBreaker; +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; @@ -39,37 +46,86 @@ /** * Polls Hystrix metrics and output JSON strings for each metric to a MetricsPollerListener. + *

+ * Polling can be stopped/started. Use shutdown() to permanently shutdown the poller. */ public class HystrixMetricsPoller { static final Logger logger = LoggerFactory.getLogger(HystrixMetricsPoller.class); private final ScheduledExecutorService executor; private final int delay; - private final AtomicBoolean running = new AtomicBoolean(true); - + private final AtomicBoolean running = new AtomicBoolean(false); + private volatile ScheduledFuture scheduledTask = null; + + /** + * Allocate resources to begin polling. + *

+ * Use start to begin polling. + *

+ * Use shutdown to cleanup resources and stop polling. + *

+ * Use pause to temporarily stop polling that can be restarted again with start. + * + * @param delay + */ public HystrixMetricsPoller(int delay) { executor = new ScheduledThreadPoolExecutor(1, new MetricsPollerThreadFactory()); this.delay = delay; } + /** + * Start polling. + */ public synchronized void start(MetricsAsJsonPollerListener listener) { - logger.info("Starting HystrixMetricsPoller"); - executor.scheduleWithFixedDelay(new MetricsPoller(listener), 0, delay, TimeUnit.MILLISECONDS); + // use compareAndSet to make sure it starts only once and when not running + if (running.compareAndSet(false, true)) { + logger.info("Starting HystrixMetricsPoller"); + scheduledTask = executor.scheduleWithFixedDelay(new MetricsPoller(listener), 0, delay, TimeUnit.MILLISECONDS); + } } - public synchronized void stop() { - // use compareAndSet not for concurrency reasons (this is synchronized) - // just as a double-check so only one execution of this method will result in it doing the shutdown + /** + * Pause (stop) polling. Polling can be started again with start as long as shutdown is not called. + */ + public synchronized void pause() { + // use compareAndSet to make sure it stops only once and when running if (running.compareAndSet(true, false)) { logger.info("Stopping the Servo Metrics Poller"); - executor.shutdownNow(); + scheduledTask.cancel(true); } } + /** + * Stops polling and shuts down the ExecutorService. + *

+ * This instance can no longer be used after calling shutdown. + */ + public synchronized void shutdown() { + pause(); + executor.shutdown(); + } + public boolean isRunning() { return running.get(); } + /** + * Used to protect against leaking ExecutorServices and threads if this class is abandoned for GC without shutting down. + */ + @SuppressWarnings("unused") + private final Object finalizerGuardian = new Object() { + protected void finalize() throws Throwable { + if (!executor.isShutdown()) { + logger.warn(HystrixMetricsPoller.class.getSimpleName() + " was not shutdown. Caught in Finalize Guardian and shutting down."); + try { + shutdown(); + } catch (Exception e) { + logger.error("Failed to shutdown " + HystrixMetricsPoller.class.getSimpleName(), e); + } + } + }; + }; + public static interface MetricsAsJsonPollerListener { public void handleJsonMetric(String json); } @@ -226,7 +282,7 @@ public void run() { } catch (Exception e) { logger.warn("Failed to output metrics as JSON", e); // shutdown - stop(); + pause(); return; } } @@ -244,4 +300,79 @@ public Thread newThread(Runnable r) { } } + public static class UnitTest { + + @Test + public void testStartStopStart() { + HystrixMetricsPoller poller = new HystrixMetricsPoller(100); + try { + final AtomicInteger metricsCount = new AtomicInteger(); + HystrixCommand test = new HystrixCommand(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")) { + + @Override + protected Boolean run() { + return true; + } + + }; + test.execute(); + + poller.start(new MetricsAsJsonPollerListener() { + + @Override + public void handleJsonMetric(String json) { + System.out.println("Received: " + json); + metricsCount.incrementAndGet(); + } + }); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + int v1 = metricsCount.get(); + + assertTrue(v1 > 0); + + poller.pause(); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + int v2 = metricsCount.get(); + + // they should be the same since we were paused + assertTrue(v2 == v1); + + poller.start(new MetricsAsJsonPollerListener() { + + @Override + public void handleJsonMetric(String json) { + System.out.println("Received: " + json); + metricsCount.incrementAndGet(); + } + }); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + int v3 = metricsCount.get(); + + // we should have more metrics again + assertTrue(v3 > v1); + + } finally { + poller.shutdown(); + } + } + } + } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java index ae0d622ec..c2bb957b6 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java @@ -127,7 +127,7 @@ private void handleRequest(HttpServletRequest request, HttpServletResponse respo Thread.sleep(delay); } } catch (Exception e) { - poller.stop(); + poller.shutdown(); logger.error("Failed to write. Will stop polling.", e); } logger.debug("Stopping Turbine stream to connection"); @@ -137,7 +137,7 @@ private void handleRequest(HttpServletRequest request, HttpServletResponse respo } finally { concurrentConnections.decrementAndGet(); if (poller != null) { - poller.stop(); + poller.shutdown(); } } }