diff --git a/hystrix-contrib/hystrix-metrics-event-stream/build.gradle b/hystrix-contrib/hystrix-metrics-event-stream/build.gradle index 50b69c0ec..609c7585d 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/build.gradle +++ b/hystrix-contrib/hystrix-metrics-event-stream/build.gradle @@ -1,6 +1,6 @@ dependencies { compile project(':hystrix-core') - compile 'com.fasterxml.jackson.core:jackson-core:2.5.2' + compile project(':hystrix-data-stream') provided 'javax.servlet:servlet-api:2.5' testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-all:1.9.5' diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java index e920c109e..d24a97186 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsPoller.java @@ -51,7 +51,10 @@ * the code below may reference a HystrixEventType that does not exist in hystrix-core. If this happens, * a j.l.NoSuchFieldError occurs. Since this data is not being generated by hystrix-core, it's safe to count it as 0 * and we should log an error to get users to update their dependency set. + * + * @deprecated Prefer {@link com.netflix.hystrix.metric.consumer.HystrixDashboardStream} */ +@Deprecated //since 1.5.4 public class HystrixMetricsPoller { static final Logger logger = LoggerFactory.getLogger(HystrixMetricsPoller.class); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java index 7a8a67355..fb53c66cf 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServlet.java @@ -17,17 +17,12 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.netflix.hystrix.contrib.sample.stream.HystrixSampleSseServlet; +import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; +import com.netflix.hystrix.serial.SerialHystrixDashboardData; +import rx.Observable; +import rx.functions.Func1; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** @@ -51,186 +46,45 @@ * * } */ -public class HystrixMetricsStreamServlet extends HttpServlet { +public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet { private static final long serialVersionUID = -7548505095303313237L; - private static final Logger logger = LoggerFactory.getLogger(HystrixMetricsStreamServlet.class); - /* used to track number of connections and throttle */ private static AtomicInteger concurrentConnections = new AtomicInteger(0); - private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.maxConcurrentConnections", 5); - private static DynamicIntProperty defaultMetricListenerQueueSize = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.defaultMetricListenerQueueSize", 1000); + private static DynamicIntProperty maxConcurrentConnections = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5); - private static volatile boolean isDestroyed = false; - - /** - * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing - * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at - * shutdown, perhaps from some other serverlet's destroy() method. - */ - public static void shutdown() { - isDestroyed = true; + public HystrixMetricsStreamServlet() { + this(HystrixDashboardStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS); } - - @Override - public void init() throws ServletException { - isDestroyed = false; + + /* package-private */ HystrixMetricsStreamServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + super(sampleStream.concatMap(new Func1>() { + @Override + public Observable call(HystrixDashboardStream.DashboardData dashboardData) { + return Observable.from(SerialHystrixDashboardData.toMultipleJsonStrings(dashboardData)); + } + }), pausePollerThreadDelayInMs); } - - /** - * Handle incoming GETs - */ + @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - if (isDestroyed) { - response.sendError(503, "Service has been shut down."); - } else { - handleRequest(request, response); - } + protected int getMaxNumberConcurrentConnectionsAllowed() { + return maxConcurrentConnections.get(); } - - /** - * Handle servlet being undeployed by gracefully releasing connections so poller threads stop. - */ + @Override - public void destroy() { - /* set marker so the loops can break out */ - isDestroyed = true; - super.destroy(); + protected int getNumberCurrentConnections() { + return concurrentConnections.get(); } - /** - * - maintain an open connection with the client - * - on initial connection send latest data of each requested event type - * - subsequently send all changes for each requested event type - * - * @param request - * @param response - * @throws javax.servlet.ServletException - * @throws java.io.IOException - */ - private void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - /* ensure we aren't allowing more connections than we want */ - int numberConnections = concurrentConnections.incrementAndGet(); - HystrixMetricsPoller poller = null; - try { - if (numberConnections > maxConcurrentConnections.get()) { - response.sendError(503, "MaxConcurrentConnections reached: " + maxConcurrentConnections.get()); - } else { - - int delay = 500; - try { - String d = request.getParameter("delay"); - if (d != null) { - delay = Math.max(Integer.parseInt(d), 1); - } - } catch (Exception e) { - // ignore if it's not a number - } - - /* initialize response */ - response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); - response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); - response.setHeader("Pragma", "no-cache"); - - int queueSize = defaultMetricListenerQueueSize.get(); - - MetricJsonListener jsonListener = new MetricJsonListener(queueSize); - poller = new HystrixMetricsPoller(jsonListener, delay); - // start polling and it will write directly to the output stream - poller.start(); - logger.debug("Starting poller"); - - // we will use a "single-writer" approach where the Servlet thread does all the writing - // by fetching JSON messages from the MetricJsonListener to write them to the output - try { - while (poller.isRunning() && !isDestroyed) { - List jsonMessages = jsonListener.getJsonMetrics(); - if (jsonMessages.isEmpty()) { - // https://github.com/Netflix/Hystrix/issues/85 hystrix.stream holds connection open if no metrics - // we send a ping to test the connection so that we'll get an IOException if the client has disconnected - response.getWriter().println("ping: \n"); - } else { - for (String json : jsonMessages) { - response.getWriter().println("data: " + json + "\n"); - } - } - - /* shortcut breaking out of loop if we have been destroyed */ - if(isDestroyed) { - break; - } - - // after outputting all the messages we will flush the stream - response.flushBuffer(); - - // explicitly check for client disconnect - PrintWriter does not throw exceptions - if (response.getWriter().checkError()) { - throw new IOException("io error"); - } - - // now wait the 'delay' time - Thread.sleep(delay); - } - } catch (InterruptedException e) { - poller.shutdown(); - logger.debug("InterruptedException. Will stop polling."); - Thread.currentThread().interrupt(); - } catch (IOException e) { - poller.shutdown(); - // debug instead of error as we expect to get these whenever a client disconnects or network issue occurs - logger.debug("IOException while trying to write (generally caused by client disconnecting). Will stop polling.", e); - } catch (Exception e) { - poller.shutdown(); - logger.error("Failed to write Hystrix metrics. Will stop polling.", e); - } - logger.debug("Stopping Turbine stream to connection"); - } - } catch (Exception e) { - logger.error("Error initializing servlet for metrics event stream.", e); - } finally { - concurrentConnections.decrementAndGet(); - if (poller != null) { - poller.shutdown(); - } - } + @Override + protected int incrementAndGetCurrentConcurrentConnections() { + return concurrentConnections.incrementAndGet(); } - /** - * This will be called from another thread so needs to be thread-safe. - * @ThreadSafe - */ - private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener { - - /** - * Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop. - *

- * This is a safety check against a runaway poller causing memory leaks. - */ - private LinkedBlockingQueue jsonMetrics; - - public MetricJsonListener(int queueSize) { - jsonMetrics = new LinkedBlockingQueue(queueSize); - } - - /** - * Store JSON messages in a queue. - */ - @Override - public void handleJsonMetric(String json) { - jsonMetrics.add(json); - } - - /** - * Get all JSON messages in the queue. - * - * @return - */ - public List getJsonMetrics() { - ArrayList metrics = new ArrayList(); - jsonMetrics.drainTo(metrics); - return metrics; - } + @Override + protected void decrementCurrentConcurrentConnections() { + concurrentConnections.decrementAndGet(); } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStream.java index ccff8e698..9598d1109 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStream.java @@ -29,6 +29,13 @@ import java.util.List; import java.util.Map; +/** + * Stream that converts HystrixRequestEvents into JSON. This isn't needed anymore, as it more straightforward + * to consider serialization completely separately from the domain object stream + * + * @deprecated Instead, prefer mapping your preferred serialization on top of {@link HystrixRequestEventsStream#observe()}. + */ +@Deprecated //since 1.5.4 public class HystrixRequestEventsJsonStream { private static final JsonFactory jsonFactory = new JsonFactory(); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java index 5a7589f86..b11096561 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java @@ -17,198 +17,58 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; +import com.netflix.hystrix.contrib.sample.stream.HystrixSampleSseServlet; import com.netflix.hystrix.metric.HystrixRequestEvents; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Subscriber; -import rx.Subscription; -import rx.schedulers.Schedulers; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import com.netflix.hystrix.serial.SerialHystrixRequestEvents; +import rx.Observable; +import rx.functions.Func1; import java.util.concurrent.atomic.AtomicInteger; /** + * Servlet that writes SSE JSON every time a request is made */ -public class HystrixRequestEventsSseServlet extends HttpServlet { - - private static final Logger logger = LoggerFactory.getLogger(HystrixRequestEventsSseServlet.class); - - private static volatile boolean isDestroyed = false; +public class HystrixRequestEventsSseServlet extends HystrixSampleSseServlet { - private static final String DELAY_REQ_PARAM_NAME = "delay"; - private static final int DEFAULT_DELAY_IN_MILLISECONDS = 10000; - private static final int DEFAULT_QUEUE_DEPTH = 1000; - private static final String PING = "\n: ping\n"; + private static final long serialVersionUID = 6389353893099737870L; /* used to track number of connections and throttle */ private static AtomicInteger concurrentConnections = new AtomicInteger(0); private static DynamicIntProperty maxConcurrentConnections = - DynamicPropertyFactory.getInstance().getIntProperty("hystrix.requests.stream.maxConcurrentConnections", 5); - - private final LinkedBlockingQueue requestQueue = new LinkedBlockingQueue(DEFAULT_QUEUE_DEPTH); - private final HystrixRequestEventsJsonStream requestEventsJsonStream; + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5); public HystrixRequestEventsSseServlet() { - requestEventsJsonStream = new HystrixRequestEventsJsonStream(); + this(HystrixRequestEventsStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS); } - /** - * Handle incoming GETs - */ - @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - if (isDestroyed) { - response.sendError(503, "Service has been shut down."); - } else { - handleRequest(request, response); - } - } - - /* package-private */ - int getDelayFromHttpRequest(HttpServletRequest req) { - try { - String delay = req.getParameter(DELAY_REQ_PARAM_NAME); - if (delay != null) { - return Math.max(Integer.parseInt(delay), 1); + /* package-private */ HystrixRequestEventsSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + super(sampleStream.map(new Func1() { + @Override + public String call(HystrixRequestEvents requestEvents) { + return SerialHystrixRequestEvents.toJsonString(requestEvents); } - } catch (Throwable ex) { - //silently fail - } - return DEFAULT_DELAY_IN_MILLISECONDS; + }), pausePollerThreadDelayInMs); } - /** - * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing - * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at - * shutdown, perhaps from some other servlet's destroy() method. - */ - public static void shutdown() { - isDestroyed = true; + @Override + protected int getMaxNumberConcurrentConnectionsAllowed() { + return maxConcurrentConnections.get(); } @Override - public void init() throws ServletException { - isDestroyed = false; + protected int getNumberCurrentConnections() { + return concurrentConnections.get(); } - /** - * Handle servlet being undeployed by gracefully releasing connections so poller threads stop. - */ @Override - public void destroy() { - /* set marker so the loops can break out */ - isDestroyed = true; - super.destroy(); + protected int incrementAndGetCurrentConcurrentConnections() { + return concurrentConnections.incrementAndGet(); } - - - /** - * - maintain an open connection with the client - * - on initial connection send latest data of each requested event type - * - subsequently send all changes for each requested event type - * - * @param request incoming HTTP Request - * @param response outgoing HTTP Response (as a streaming response) - * @throws javax.servlet.ServletException - * @throws java.io.IOException - */ - private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { - final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true); - Subscription requestsSubscription = null; - - /* ensure we aren't allowing more connections than we want */ - int numberConnections = concurrentConnections.incrementAndGet(); - try { - int maxNumberConnectionsAllowed = maxConcurrentConnections.get(); - if (numberConnections > maxNumberConnectionsAllowed) { - response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed); - } else { - int delay = getDelayFromHttpRequest(request); - - /* initialize response */ - response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); - response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); - response.setHeader("Pragma", "no-cache"); - - final PrintWriter writer = response.getWriter(); - - //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread - //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext - requestsSubscription = requestEventsJsonStream - .getStream() - .observeOn(Schedulers.io()) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - logger.error("HystrixRequestEventsSseServlet received unexpected OnCompleted from request stream"); - moreDataWillBeSent.set(false); - } - - @Override - public void onError(Throwable e) { - moreDataWillBeSent.set(false); - } - - @Override - public void onNext(HystrixRequestEvents requestEvents) { - if (requestEvents != null) { - requestQueue.offer(requestEvents); - } - } - }); - - while (moreDataWillBeSent.get() && !isDestroyed) { - try { - if (requestQueue.isEmpty()) { - try { - writer.print(PING); - writer.flush(); - } catch (Throwable t) { - throw new IOException("Exception while writing ping"); - } - - if (writer.checkError()) { - throw new IOException("io error"); - } - } else { - List l = new ArrayList(); - requestQueue.drainTo(l); - String requestEventsAsStr = HystrixRequestEventsJsonStream.convertRequestsToJson(l); - if (requestEventsAsStr != null) { - try { - writer.print("data: " + requestEventsAsStr + "\n\n"); - // explicitly check for client disconnect - PrintWriter does not throw exceptions - if (writer.checkError()) { - throw new IOException("io error"); - } - writer.flush(); - } catch (IOException ioe) { - moreDataWillBeSent.set(false); - } - } - } - Thread.sleep(delay); - } catch (InterruptedException e) { - moreDataWillBeSent.set(false); - } - } - } - } finally { - concurrentConnections.decrementAndGet(); - if (requestsSubscription != null && !requestsSubscription.isUnsubscribed()) { - requestsSubscription.unsubscribe(); - } - } + @Override + protected void decrementCurrentConcurrentConnections() { + concurrentConnections.decrementAndGet(); } } + diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java index df146e987..d17685011 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java @@ -19,9 +19,10 @@ import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.config.HystrixConfiguration; import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.serial.SerialHystrixConfiguration; import rx.Observable; +import rx.functions.Func1; -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -45,7 +46,7 @@ * * } */ -public class HystrixConfigSseServlet extends HystrixSampleSseServlet { +public class HystrixConfigSseServlet extends HystrixSampleSseServlet { private static final long serialVersionUID = -3599771169762858235L; @@ -54,20 +55,25 @@ public class HystrixConfigSseServlet extends HystrixSampleSseServlet sampleStream, int pausePollerThreadDelayInMs) { - super(sampleStream, pausePollerThreadDelayInMs); + super(sampleStream.map(new Func1() { + @Override + public String call(HystrixConfiguration hystrixConfiguration) { + return SerialHystrixConfiguration.toJsonString(hystrixConfiguration); + } + }), pausePollerThreadDelayInMs); } @Override - int getMaxNumberConcurrentConnectionsAllowed() { + protected int getMaxNumberConcurrentConnectionsAllowed() { return maxConcurrentConnections.get(); } @Override - int getNumberCurrentConnections() { + protected int getNumberCurrentConnections() { return concurrentConnections.get(); } @@ -80,10 +86,5 @@ protected int incrementAndGetCurrentConcurrentConnections() { protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - - @Override - protected String convertToString(HystrixConfiguration config) throws IOException { - return HystrixConfigurationJsonStream.convertToString(config); - } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java index 12f5f3d42..1008fb970 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java @@ -25,6 +25,7 @@ import com.netflix.hystrix.config.HystrixConfiguration; import com.netflix.hystrix.config.HystrixConfigurationStream; import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; import rx.Observable; import rx.functions.Func1; @@ -40,7 +41,9 @@ *

  • Consumer of your choice that wants control over where to embed this stream * * + * @deprecated Instead, prefer mapping your preferred serialization on top of {@link HystrixConfigurationStream#observe()}. */ +@Deprecated //since 1.5.4 public class HystrixConfigurationJsonStream { private static final JsonFactory jsonFactory = new JsonFactory(); diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java index 2882c0881..ca0839de9 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java @@ -32,41 +32,37 @@ /** */ -public abstract class HystrixSampleSseServlet extends HttpServlet { - protected final Observable sampleStream; +public abstract class HystrixSampleSseServlet extends HttpServlet { + protected final Observable sampleStream; private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class); //wake up occasionally and check that poller is still alive. this value controls how often - private static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500; + protected static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500; private final int pausePollerThreadDelayInMs; /* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */ private static volatile boolean isDestroyed = false; - protected HystrixSampleSseServlet(Observable sampleStream) { + protected HystrixSampleSseServlet(Observable sampleStream) { this.sampleStream = sampleStream; this.pausePollerThreadDelayInMs = DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS; } - protected HystrixSampleSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { + protected HystrixSampleSseServlet(Observable sampleStream, int pausePollerThreadDelayInMs) { this.sampleStream = sampleStream; this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs; } - abstract int getMaxNumberConcurrentConnectionsAllowed(); + protected abstract int getMaxNumberConcurrentConnectionsAllowed(); - abstract int getNumberCurrentConnections(); + protected abstract int getNumberCurrentConnections(); protected abstract int incrementAndGetCurrentConcurrentConnections(); protected abstract void decrementCurrentConcurrentConnections(); - //protected abstract Observable getStream(); - - protected abstract String convertToString(SampleData sampleData) throws IOException; - /** * Handle incoming GETs */ @@ -131,13 +127,11 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse final PrintWriter writer = response.getWriter(); - //Observable sampledStream = getStream(); - //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext sampleSubscription = sampleStream .observeOn(Schedulers.io()) - .subscribe(new Subscriber() { + .subscribe(new Subscriber() { @Override public void onCompleted() { logger.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName()); @@ -150,26 +144,17 @@ public void onError(Throwable e) { } @Override - public void onNext(SampleData sampleData) { - if (sampleData != null) { - String sampleDataAsStr = null; + public void onNext(String sampleDataAsString) { + if (sampleDataAsString != null) { try { - sampleDataAsStr = convertToString(sampleData); - } catch (IOException ioe) { - //exception while converting String to JSON - logger.error("Error converting configuration to JSON ", ioe); - } - if (sampleDataAsStr != null) { - try { - writer.print("data: " + sampleDataAsStr + "\n\n"); - // explicitly check for client disconnect - PrintWriter does not throw exceptions - if (writer.checkError()) { - throw new IOException("io error"); - } - writer.flush(); - } catch (IOException ioe) { - moreDataWillBeSent.set(false); + writer.print("data: " + sampleDataAsString + "\n\n"); + // explicitly check for client disconnect - PrintWriter does not throw exceptions + if (writer.checkError()) { + throw new IOException("io error"); } + writer.flush(); + } catch (IOException ioe) { + moreDataWillBeSent.set(false); } } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java index c7c8dfafe..c3ffe726e 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationJsonStream.java @@ -37,8 +37,9 @@ *
  • {@link HystrixUtilizationSseServlet} for mapping a specific URL to this data as an SSE stream *
  • Consumer of your choice that wants control over where to embed this stream * - * + * @deprecated Instead, prefer mapping your preferred serialization on top of {@link HystrixUtilizationStream#observe()}. */ +@Deprecated //since 1.5.4 public class HystrixUtilizationJsonStream { private final Func1> streamGenerator; diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java index 19c6a85df..c8ef48315 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java @@ -19,9 +19,10 @@ import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.metric.sample.HystrixUtilization; import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; +import com.netflix.hystrix.serial.SerialHystrixUtilization; import rx.Observable; +import rx.functions.Func1; -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -45,7 +46,7 @@ * * } */ -public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet { +public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet { private static final long serialVersionUID = -7812908330777694972L; @@ -55,20 +56,25 @@ public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet sampleStream, int pausePollerThreadDelayInMs) { - super(sampleStream, pausePollerThreadDelayInMs); + super(sampleStream.map(new Func1() { + @Override + public String call(HystrixUtilization hystrixUtilization) { + return SerialHystrixUtilization.toJsonString(hystrixUtilization); + } + }), pausePollerThreadDelayInMs); } @Override - int getMaxNumberConcurrentConnectionsAllowed() { + protected int getMaxNumberConcurrentConnectionsAllowed() { return maxConcurrentConnections.get(); } @Override - int getNumberCurrentConnections() { + protected int getNumberCurrentConnections() { return concurrentConnections.get(); } @@ -81,10 +87,5 @@ protected int incrementAndGetCurrentConcurrentConnections() { protected void decrementCurrentConcurrentConnections() { concurrentConnections.decrementAndGet(); } - - @Override - protected String convertToString(HystrixUtilization utilization) throws IOException { - return HystrixUtilizationJsonStream.convertToJson(utilization); - } } diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServletUnitTest.java b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServletUnitTest.java index 4c60136d9..14049cf1c 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServletUnitTest.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/metrics/eventstream/HystrixMetricsStreamServletUnitTest.java @@ -15,31 +15,70 @@ */ package com.netflix.hystrix.contrib.metrics.eventstream; +import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.functions.Func1; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class HystrixMetricsStreamServletUnitTest { + @Mock HttpServletRequest mockReq; + @Mock HttpServletResponse mockResp; + @Mock HystrixDashboardStream.DashboardData mockDashboard; + @Mock PrintWriter mockPrintWriter; + + HystrixMetricsStreamServlet servlet; + + private final Observable streamOfOnNexts = + Observable.interval(100, TimeUnit.MILLISECONDS).map(new Func1() { + @Override + public HystrixDashboardStream.DashboardData call(Long timestamp) { + return mockDashboard; + } + }); + + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + when(mockReq.getMethod()).thenReturn("GET"); + } + + @After + public void tearDown() { + servlet.destroy(); + servlet.shutdown(); + } + @Test public void shutdownServletShouldRejectRequests() throws ServletException, IOException { + servlet = new HystrixMetricsStreamServlet(streamOfOnNexts, 10); + try { + servlet.init(); + } catch (ServletException ex) { - final HystrixMetricsStreamServlet servlet = new HystrixMetricsStreamServlet(); - servlet.shutdown(); + } - final HttpServletResponse response = mock(HttpServletResponse.class); - servlet.doGet(mock(HttpServletRequest.class), response); + servlet.shutdown(); - verify(response).sendError(503, "Service has been shut down."); + servlet.service(mockReq, mockResp); + verify(mockResp).sendError(503, "Service has been shut down."); } - } \ No newline at end of file diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java index dfc868b4c..27b9ad39a 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java @@ -88,7 +88,6 @@ public void call(Subscriber subscriber) { @Before public void init() { MockitoAnnotations.initMocks(this); - } @After @@ -111,7 +110,6 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc servlet.doGet(mockReq, mockResp); verify(mockResp).sendError(503, "Service has been shut down."); - } @Test diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle index 134136cc1..802a8fbae 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle @@ -9,12 +9,7 @@ targetCompatibility = JavaVersion.VERSION_1_8 dependencies { compile project(':hystrix-core') - - compile 'com.fasterxml.jackson.core:jackson-core:latest.release' - compile 'com.fasterxml.jackson.core:jackson-databind:latest.release' - compile 'com.fasterxml.jackson.core:jackson-annotations:latest.release' - compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release' - compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:latest.release' + compile project(':hystrix-data-stream') compile 'io.reactivesocket:reactivesocket:latest.release' compile 'io.reactivesocket:reactivesocket-netty:latest.release' compile 'io.reactivex:rxjava-reactive-streams:latest.release' diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java index 627be3616..60b2c22c7 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java @@ -16,18 +16,18 @@ package com.netflix.hystrix.contrib.reactivesocket; import com.netflix.hystrix.config.HystrixConfigurationStream; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixConfiguration; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixDashboardData; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixMetric; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixRequestEvents; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization; import com.netflix.hystrix.metric.HystrixRequestEventsStream; import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; +import com.netflix.hystrix.serial.SerialHystrixConfiguration; +import com.netflix.hystrix.serial.SerialHystrixDashboardData; +import com.netflix.hystrix.serial.SerialHystrixRequestEvents; +import com.netflix.hystrix.serial.SerialHystrixUtilization; +import io.reactivesocket.Frame; import io.reactivesocket.Payload; import rx.Observable; -import java.util.Arrays; +import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -56,25 +56,25 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) { source = HystrixConfigurationStream.getInstance() .observe() .map(SerialHystrixConfiguration::toBytes) - .map(SerialHystrixMetric::toPayload); + .map(EventStream::toPayload); break; case REQUEST_EVENT_STREAM: source = HystrixRequestEventsStream.getInstance() .observe() .map(SerialHystrixRequestEvents::toBytes) - .map(SerialHystrixMetric::toPayload); + .map(EventStream::toPayload); break; case UTILIZATION_STREAM: source = HystrixUtilizationStream.getInstance() .observe() .map(SerialHystrixUtilization::toBytes) - .map(SerialHystrixMetric::toPayload); + .map(EventStream::toPayload); break; case GENERAL_DASHBOARD_STREAM: source = HystrixDashboardStream.getInstance() .observe() .map(SerialHystrixDashboardData::toBytes) - .map(SerialHystrixMetric::toPayload); + .map(EventStream::toPayload); break; default: throw new IllegalArgumentException("Unknown EventStreamEnum : " + eventStreamEnum); @@ -86,4 +86,18 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) { public boolean isSourceCurrentlySubscribed() { return isSourceCurrentlySubscribed.get(); } + + public static Payload toPayload(byte[] byteArray) { + return new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(byteArray); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java deleted file mode 100644 index c64e3dcaa..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket.serialize; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixCircuitBreaker; -import com.netflix.hystrix.HystrixCollapserKey; -import com.netflix.hystrix.HystrixCollapserMetrics; -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixCommandMetrics; -import com.netflix.hystrix.HystrixCommandProperties; -import com.netflix.hystrix.HystrixEventType; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.HystrixThreadPoolMetrics; -import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; -import org.agrona.LangUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.function.Supplier; - -public class SerialHystrixDashboardData extends SerialHystrixMetric { - - private static final Logger logger = LoggerFactory.getLogger(SerialHystrixDashboardData.class); - - public static byte[] toBytes(HystrixDashboardStream.DashboardData dashboardData) { - byte[] retVal = null; - - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = cborFactory.createGenerator(bos); - - json.writeStartArray(); - - for (HystrixCommandMetrics commandMetrics: dashboardData.getCommandMetrics()) { - writeCommandMetrics(commandMetrics, json); - } - - for (HystrixThreadPoolMetrics threadPoolMetrics: dashboardData.getThreadPoolMetrics()) { - writeThreadPoolMetrics(threadPoolMetrics, json); - } - - for (HystrixCollapserMetrics collapserMetrics: dashboardData.getCollapserMetrics()) { - writeCollapserMetrics(collapserMetrics, json); - } - - json.writeEndArray(); - - json.close(); - retVal = bos.toByteArray(); - - } catch (Exception e) { - LangUtil.rethrowUnchecked(e); - } - - return retVal; - } - - private static void writeCommandMetrics(HystrixCommandMetrics commandMetrics, JsonGenerator json) throws IOException { - HystrixCommandKey key = commandMetrics.getCommandKey(); - HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); - - json.writeStartObject(); - json.writeStringField("type", "HystrixCommand"); - json.writeStringField("name", key.name()); - json.writeStringField("group", commandMetrics.getCommandGroup().name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - // circuit breaker - if (circuitBreaker == null) { - // circuit breaker is disabled and thus never open - json.writeBooleanField("isCircuitBreakerOpen", false); - } else { - json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen()); - } - HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts(); - json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage()); - json.writeNumberField("errorCount", healthCounts.getErrorCount()); - json.writeNumberField("requestCount", healthCounts.getTotalRequests()); - - // rolling counters - safelyWriteNumberField(json, "rollingCountBadRequests", () -> commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST)); - safelyWriteNumberField(json, "rollingCountCollapsedRequests", () -> commandMetrics.getRollingCount(HystrixEventType.COLLAPSED)); - safelyWriteNumberField(json, "rollingCountEmit", () -> commandMetrics.getRollingCount(HystrixEventType.EMIT)); - safelyWriteNumberField(json, "rollingCountExceptionsThrown", () -> commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN)); - safelyWriteNumberField(json, "rollingCountFailure", () -> commandMetrics.getRollingCount(HystrixEventType.FAILURE)); - safelyWriteNumberField(json, "rollingCountFallbackEmit", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT)); - safelyWriteNumberField(json, "rollingCountFallbackFailure", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE)); - safelyWriteNumberField(json, "rollingCountFallbackMissing", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING)); - safelyWriteNumberField(json, "rollingCountFallbackRejection", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION)); - safelyWriteNumberField(json, "rollingCountFallbackSuccess", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS)); - safelyWriteNumberField(json, "rollingCountResponsesFromCache", () -> commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE)); - safelyWriteNumberField(json, "rollingCountSemaphoreRejected", () -> commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED)); - safelyWriteNumberField(json, "rollingCountShortCircuited", () -> commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED)); - safelyWriteNumberField(json, "rollingCountSuccess", () -> commandMetrics.getRollingCount(HystrixEventType.SUCCESS)); - safelyWriteNumberField(json, "rollingCountThreadPoolRejected", () -> commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED)); - safelyWriteNumberField(json, "rollingCountTimeout", () -> commandMetrics.getRollingCount(HystrixEventType.TIMEOUT)); - - json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount()); - json.writeNumberField("rollingMaxConcurrentExecutionCount", commandMetrics.getRollingMaxConcurrentExecutions()); - - // latency percentiles - json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean()); - json.writeObjectFieldStart("latencyExecute"); - json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0)); - json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25)); - json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50)); - json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75)); - json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90)); - json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95)); - json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99)); - json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5)); - json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100)); - json.writeEndObject(); - // - json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean()); - json.writeObjectFieldStart("latencyTotal"); - json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0)); - json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25)); - json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50)); - json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75)); - json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90)); - json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95)); - json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99)); - json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5)); - json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100)); - json.writeEndObject(); - - // property values for reporting what is actually seen by the command rather than what was set somewhere - HystrixCommandProperties commandProperties = commandMetrics.getProperties(); - - json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get()); - json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get()); - json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get()); - json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get()); - json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get()); - json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get()); - - json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name()); - json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); - json.writeNumberField("propertyValue_executionTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); - json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get()); - json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get()); - json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get()); - json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get()); - - /* - * The following are commented out as these rarely change and are verbose for streaming for something people don't change. - * We could perhaps allow a property or request argument to include these. - */ - - // json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get()); - // json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get()); - // json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get()); - // json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get()); - // json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get()); - json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get()); - - json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get()); - json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name()); - - json.writeEndObject(); - } - - private static void writeThreadPoolMetrics(HystrixThreadPoolMetrics threadPoolMetrics, JsonGenerator json) throws IOException { - HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); - - json.writeStartObject(); - - json.writeStringField("type", "HystrixThreadPool"); - json.writeStringField("name", key.name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); - json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); - json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); - json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); - json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); - json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); - json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); - json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); - safelyWriteNumberField(json, "rollingCountThreadsExecuted", () -> threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED)); - json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); - safelyWriteNumberField(json, "rollingCountCommandRejections", () -> threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED)); - - json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); - json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - - json.writeEndObject(); - } - - private static void writeCollapserMetrics(HystrixCollapserMetrics collapserMetrics, JsonGenerator json) throws IOException { - HystrixCollapserKey key = collapserMetrics.getCollapserKey(); - - json.writeStartObject(); - - json.writeStringField("type", "HystrixCollapser"); - json.writeStringField("name", key.name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - safelyWriteNumberField(json, "rollingCountRequestsBatched", () -> collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH)); - safelyWriteNumberField(json, "rollingCountBatches", () -> collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED)); - safelyWriteNumberField(json, "rollingCountResponsesFromCache", () -> collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE)); - - // batch size percentiles - json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean()); - json.writeObjectFieldStart("batchSize"); - json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25)); - json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50)); - json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75)); - json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90)); - json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95)); - json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99)); - json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5)); - json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100)); - json.writeEndObject(); - - // shard size percentiles (commented-out for now) - //json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean()); - //json.writeObjectFieldStart("shardSize"); - //json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25)); - //json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50)); - //json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75)); - //json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90)); - //json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95)); - //json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99)); - //json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5)); - //json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100)); - //json.writeEndObject(); - - //json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); - json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get()); - json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get()); - json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - - json.writeEndObject(); - } - - protected static void safelyWriteNumberField(JsonGenerator json, String name, Supplier metricGenerator) throws IOException { - try { - json.writeNumberField(name, metricGenerator.get()); - } catch (NoSuchFieldError error) { - logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!"); - json.writeNumberField(name, 0L); - } - } - - -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java index 8057198d0..bd9f89b89 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java @@ -68,7 +68,7 @@ public ByteBuffer getMetadata() { public void testConfigStreamHasData() throws Exception { final AtomicBoolean hasBytes = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); - final int NUM = 10; + final int NUM = 2; EventStream.getInstance(EventStreamEnum.CONFIG_STREAM).get() .take(NUM) diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle b/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle index 539cbc9da..537b49965 100644 --- a/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle +++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle @@ -1,8 +1,7 @@ dependencies { compile project(':hystrix-core') + compile project(':hystrix-data-stream') compile 'io.reactivex:rxnetty:0.4.12' - compile 'org.codehaus.jackson:jackson-core-asl:1.9.2' - compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.2' testCompile 'junit:junit-dep:4.10' testCompile 'org.powermock:powermock-easymock-release-full:1.5.5' testCompile 'org.easymock:easymock:3.2' diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java index 5d362eb1e..27e87217a 100644 --- a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java +++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java @@ -18,6 +18,7 @@ import com.netflix.hystrix.HystrixCollapserMetrics; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.serial.SerialHystrixDashboardData; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import io.reactivex.netty.protocol.http.server.HttpServerRequest; @@ -34,8 +35,6 @@ import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; -import static com.netflix.hystrix.contrib.rxnetty.metricsstream.JsonMappers.*; - /** * Streams Hystrix metrics in Server Sent Event (SSE) format. RxNetty application handlers shall * be wrapped by this handler. It transparently intercepts HTTP requests at a configurable path @@ -96,13 +95,13 @@ public void call(Long tick) { } try { for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) { - writeMetric(toJson(commandMetrics), response); + writeMetric(SerialHystrixDashboardData.toJsonString(commandMetrics), response); } for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) { - writeMetric(toJson(threadPoolMetrics), response); + writeMetric(SerialHystrixDashboardData.toJsonString(threadPoolMetrics), response); } for (HystrixCollapserMetrics collapserMetrics : HystrixCollapserMetrics.getInstances()) { - writeMetric(toJson(collapserMetrics), response); + writeMetric(SerialHystrixDashboardData.toJsonString(collapserMetrics), response); } } catch (Exception e) { subject.onError(e); diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java index 608d37a26..9399c5f39 100644 --- a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java +++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java @@ -15,6 +15,8 @@ */ package com.netflix.hystrix.contrib.rxnetty.metricsstream; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandMetricsSamples; import io.netty.buffer.ByteBuf; @@ -28,8 +30,6 @@ import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; import io.reactivex.netty.protocol.text.sse.ServerSentEvent; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/hystrix-data-stream/build.gradle b/hystrix-data-stream/build.gradle new file mode 100644 index 000000000..f98c849ec --- /dev/null +++ b/hystrix-data-stream/build.gradle @@ -0,0 +1,23 @@ +repositories { + mavenCentral() + jcenter() +} + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':hystrix-core') + + //if we bump into the the 2.8.0 series, we are forced to use Java7 + compile 'com.fasterxml.jackson.core:jackson-core:2.7.5' + compile 'com.fasterxml.jackson.core:jackson-databind:2.7.5' + compile 'com.fasterxml.jackson.core:jackson-annotations:2.7.5' + compile 'com.fasterxml.jackson.module:jackson-module-afterburner:2.7.5' + compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.7.5' + + testCompile 'junit:junit-dep:4.10' + testCompile 'org.mockito:mockito-all:1.9.5' + testCompile project(':hystrix-core').sourceSets.test.output + testCompile project(':hystrix-junit') +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java similarity index 95% rename from hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java index 18eb224d5..2d83e368f 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java @@ -15,6 +15,8 @@ */ package com.netflix.hystrix.config; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserMetrics; import com.netflix.hystrix.HystrixCollapserProperties; @@ -43,6 +45,10 @@ public class HystrixConfigurationStream { private final Observable allConfigurationStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + private static final DynamicIntProperty dataEmissionIntervalInMs = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.config.intervalInMilliseconds", 5000); + + private static final Func1 getAllConfig = new Func1() { @Override @@ -80,7 +86,9 @@ public void call() { .onBackpressureDrop(); } - private static final HystrixConfigurationStream INSTANCE = new HystrixConfigurationStream(500); + //The data emission interval is looked up on startup only + private static final HystrixConfigurationStream INSTANCE = + new HystrixConfigurationStream(dataEmissionIntervalInMs.get()); public static HystrixConfigurationStream getInstance() { return INSTANCE; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java similarity index 88% rename from hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java index eb47253ce..7913424b9 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java @@ -15,6 +15,8 @@ */ package com.netflix.hystrix.metric.consumer; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.HystrixCollapserMetrics; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixThreadPoolMetrics; @@ -31,6 +33,9 @@ public class HystrixDashboardStream { final Observable singleSource; final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + private static final DynamicIntProperty dataEmissionIntervalInMs = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.dashboard.intervalInMilliseconds", 500); + private HystrixDashboardStream(int delayInMs) { this.delayInMs = delayInMs; this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS) @@ -60,7 +65,9 @@ public void call() { .onBackpressureDrop(); } - private static final HystrixDashboardStream INSTANCE = new HystrixDashboardStream(500); + //The data emission interval is looked up on startup only + private static final HystrixDashboardStream INSTANCE = + new HystrixDashboardStream(dataEmissionIntervalInMs.get()); public static HystrixDashboardStream getInstance() { return INSTANCE; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java similarity index 100% rename from hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java similarity index 93% rename from hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java index aaff7dcdc..d509e4df3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java @@ -15,6 +15,8 @@ */ package com.netflix.hystrix.metric.sample; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixThreadPoolKey; @@ -36,6 +38,10 @@ public class HystrixUtilizationStream { private final Observable allUtilizationStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + private static final DynamicIntProperty dataEmissionIntervalInMs = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.utilization.intervalInMilliseconds", 500); + + private static final Func1 getAllUtilization = new Func1() { @Override @@ -72,7 +78,9 @@ public void call() { .onBackpressureDrop(); } - private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(500); + //The data emission interval is looked up on startup only + private static final HystrixUtilizationStream INSTANCE = + new HystrixUtilizationStream(dataEmissionIntervalInMs.get()); public static HystrixUtilizationStream getInstance() { return INSTANCE; diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixConfiguration.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java similarity index 93% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixConfiguration.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java index 32045ea11..417157f14 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixConfiguration.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.reactivesocket.serialize; +package com.netflix.hystrix.serial; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; @@ -27,12 +27,12 @@ import com.netflix.hystrix.config.HystrixCommandConfiguration; import com.netflix.hystrix.config.HystrixConfiguration; import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; -import org.agrona.LangUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.StringWriter; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; @@ -47,8 +47,34 @@ public static byte[] toBytes(HystrixConfiguration config) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = cborFactory.createGenerator(bos); + JsonGenerator cbor = cborFactory.createGenerator(bos); + serializeConfiguration(config, cbor); + + retVal = bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return retVal; + } + + public static String toJsonString(HystrixConfiguration config) { + StringWriter jsonString = new StringWriter(); + + try { + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + serializeConfiguration(config, json); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return jsonString.getBuffer().toString(); + } + + private static void serializeConfiguration(HystrixConfiguration config, JsonGenerator json) { + try { json.writeStartObject(); json.writeStringField("type", "HystrixConfig"); json.writeObjectFieldStart("commands"); @@ -77,23 +103,19 @@ public static byte[] toBytes(HystrixConfiguration config) { json.writeEndObject(); json.writeEndObject(); json.close(); - - - retVal = bos.toByteArray(); } catch (Exception e) { - LangUtil.rethrowUnchecked(e); + throw new RuntimeException(e); } - return retVal; } public static HystrixConfiguration fromByteBuffer(ByteBuffer bb) { byte[] byteArray = new byte[bb.remaining()]; bb.get(byteArray); - Map commandConfigMap = new HashMap<>(); - Map threadPoolConfigMap = new HashMap<>(); - Map collapserConfigMap = new HashMap<>(); + Map commandConfigMap = new HashMap(); + Map threadPoolConfigMap = new HashMap(); + Map collapserConfigMap = new HashMap(); try { CBORParser parser = cborFactory.createParser(byteArray); diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/JsonMappers.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixDashboardData.java similarity index 69% rename from hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/JsonMappers.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixDashboardData.java index b67f84660..916f36055 100644 --- a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/JsonMappers.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixDashboardData.java @@ -1,11 +1,11 @@ /** - * Copyright 2015 Netflix, Inc. + * Copyright 2016 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,61 +13,147 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.rxnetty.metricsstream; +package com.netflix.hystrix.serial; +import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.HystrixCircuitBreaker; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserMetrics; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; -import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixEventType; import com.netflix.hystrix.HystrixThreadPoolKey; import com.netflix.hystrix.HystrixThreadPoolMetrics; -import com.netflix.hystrix.util.HystrixRollingNumberEvent; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonGenerator; +import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; import rx.functions.Func0; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; -/** - * This code is taken from hystrix-metrics-event-stream module's HystrixMetricsPoller class. - * - * @author Tomasz Bak - */ -final class JsonMappers { +public class SerialHystrixDashboardData extends SerialHystrixMetric { + + private static final Logger logger = LoggerFactory.getLogger(SerialHystrixDashboardData.class); + + public static byte[] toBytes(HystrixDashboardStream.DashboardData dashboardData) { + byte[] retVal = null; - private static final JsonFactory jsonFactory = new JsonFactory(); - static final Logger logger = LoggerFactory.getLogger(JsonMappers.class); + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator cbor = cborFactory.createGenerator(bos); + writeDashboardData(cbor, dashboardData); + retVal = bos.toByteArray(); + + } catch (Exception e) { + throw new RuntimeException(e); + } - private JsonMappers() { + return retVal; } - private static void safelyWriteNumberField(JsonGenerator json, String name, Func0 metricGenerator) throws IOException { + public static String toJsonString(HystrixDashboardStream.DashboardData dashboardData) { + StringWriter jsonString = new StringWriter(); + try { - json.writeNumberField(name, metricGenerator.call()); - } catch (NoSuchFieldError error) { - logger.error("While publishing Hystrix metrics stream, error looking up eventType for : {}. Please check that all Hystrix versions are the same!",name); - json.writeNumberField(name, 0L); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + writeDashboardData(json, dashboardData); + } catch (Exception e) { + throw new RuntimeException(e); } + + return jsonString.getBuffer().toString(); } - /** - * An implementation note. If there's a version mismatch between hystrix-core and hystrix-rx-netty-metrics-stream, - * the code below may reference a HystrixRollingNumberEvent that does not exist in hystrix-core. If this happens, - * a j.l.NoSuchFieldError occurs. Since this data is not being generated by hystrix-core, it's safe to count it as 0 - * and we should log an error to get users to update their dependency set. - */ - static String toJson(final HystrixCommandMetrics commandMetrics) throws IOException { - HystrixCommandKey key = commandMetrics.getCommandKey(); - HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); + public static List toMultipleJsonStrings(HystrixDashboardStream.DashboardData dashboardData) { + List jsonStrings = new ArrayList(); + + for (HystrixCommandMetrics commandMetrics : dashboardData.getCommandMetrics()) { + jsonStrings.add(toJsonString(commandMetrics)); + } + for (HystrixThreadPoolMetrics threadPoolMetrics : dashboardData.getThreadPoolMetrics()) { + jsonStrings.add(toJsonString(threadPoolMetrics)); + } + + for (HystrixCollapserMetrics collapserMetrics : dashboardData.getCollapserMetrics()) { + jsonStrings.add(toJsonString(collapserMetrics)); + } + + return jsonStrings; + } + + private static void writeDashboardData(JsonGenerator json, HystrixDashboardStream.DashboardData dashboardData) { + try { + json.writeStartArray(); + + for (HystrixCommandMetrics commandMetrics : dashboardData.getCommandMetrics()) { + writeCommandMetrics(commandMetrics, json); + } + + for (HystrixThreadPoolMetrics threadPoolMetrics : dashboardData.getThreadPoolMetrics()) { + writeThreadPoolMetrics(threadPoolMetrics, json); + } + + for (HystrixCollapserMetrics collapserMetrics : dashboardData.getCollapserMetrics()) { + writeCollapserMetrics(collapserMetrics, json); + } + + json.writeEndArray(); + + json.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static String toJsonString(HystrixCommandMetrics commandMetrics) { StringWriter jsonString = new StringWriter(); - JsonGenerator json = jsonFactory.createJsonGenerator(jsonString); + + try { + JsonGenerator json = jsonFactory.createGenerator(jsonString); + writeCommandMetrics(commandMetrics, json); + json.close(); + return jsonString.getBuffer().toString(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + public static String toJsonString(HystrixThreadPoolMetrics threadPoolMetrics) { + StringWriter jsonString = new StringWriter(); + + try { + JsonGenerator json = jsonFactory.createGenerator(jsonString); + writeThreadPoolMetrics(threadPoolMetrics, json); + json.close(); + return jsonString.getBuffer().toString(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + public static String toJsonString(HystrixCollapserMetrics collapserMetrics) { + StringWriter jsonString = new StringWriter(); + + try { + JsonGenerator json = jsonFactory.createGenerator(jsonString); + writeCollapserMetrics(collapserMetrics, json); + json.close(); + return jsonString.getBuffer().toString(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + private static void writeCommandMetrics(final HystrixCommandMetrics commandMetrics, JsonGenerator json) throws IOException { + HystrixCommandKey key = commandMetrics.getCommandKey(); + HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); json.writeStartObject(); json.writeStringField("type", "HystrixCommand"); @@ -82,7 +168,7 @@ static String toJson(final HystrixCommandMetrics commandMetrics) throws IOExcept } else { json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen()); } - HealthCounts healthCounts = commandMetrics.getHealthCounts(); + HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts(); json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage()); json.writeNumberField("errorCount", healthCounts.getErrorCount()); json.writeNumberField("requestCount", healthCounts.getTotalRequests()); @@ -91,97 +177,97 @@ static String toJson(final HystrixCommandMetrics commandMetrics) throws IOExcept safelyWriteNumberField(json, "rollingCountBadRequests", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.BAD_REQUEST); + return commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST); } }); safelyWriteNumberField(json, "rollingCountCollapsedRequests", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSED); + return commandMetrics.getRollingCount(HystrixEventType.COLLAPSED); } }); safelyWriteNumberField(json, "rollingCountEmit", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.EMIT); + return commandMetrics.getRollingCount(HystrixEventType.EMIT); } }); safelyWriteNumberField(json, "rollingCountExceptionsThrown", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN); + return commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN); } }); safelyWriteNumberField(json, "rollingCountFailure", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.FAILURE); + return commandMetrics.getRollingCount(HystrixEventType.FAILURE); } }); safelyWriteNumberField(json, "rollingCountFallbackEmit", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_EMIT); + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT); } }); safelyWriteNumberField(json, "rollingCountFallbackFailure", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE); + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE); } }); safelyWriteNumberField(json, "rollingCountFallbackMissing", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_MISSING); + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING); } }); safelyWriteNumberField(json, "rollingCountFallbackRejection", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION); + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION); } }); safelyWriteNumberField(json, "rollingCountFallbackSuccess", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS); + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS); } }); safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE); + return commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE); } }); safelyWriteNumberField(json, "rollingCountSemaphoreRejected", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED); + return commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED); } }); safelyWriteNumberField(json, "rollingCountShortCircuited", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED); + return commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED); } }); safelyWriteNumberField(json, "rollingCountSuccess", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS); + return commandMetrics.getRollingCount(HystrixEventType.SUCCESS); } }); safelyWriteNumberField(json, "rollingCountThreadPoolRejected", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); + return commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED); } }); safelyWriteNumberField(json, "rollingCountTimeout", new Func0() { @Override public Long call() { - return commandMetrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT); + return commandMetrics.getRollingCount(HystrixEventType.TIMEOUT); } }); @@ -249,18 +335,14 @@ public Long call() { json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get()); json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name()); json.writeEndObject(); - json.close(); - - return jsonString.getBuffer().toString(); } - static String toJson(final HystrixThreadPoolMetrics threadPoolMetrics) throws IOException { + private static void writeThreadPoolMetrics(final HystrixThreadPoolMetrics threadPoolMetrics, JsonGenerator json) throws IOException { HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); - StringWriter jsonString = new StringWriter(); - JsonGenerator json = jsonFactory.createJsonGenerator(jsonString); json.writeStartObject(); json.writeStringField("type", "HystrixThreadPool"); @@ -275,29 +357,31 @@ static String toJson(final HystrixThreadPoolMetrics threadPoolMetrics) throws IO json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); - json.writeNumberField("rollingCountThreadsExecuted", threadPoolMetrics.getRollingCountThreadsExecuted()); + safelyWriteNumberField(json, "rollingCountThreadsExecuted", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED); + } + }); json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); - safelyWriteNumberField(json, "rollingCountCommandsRejected", new Func0() { + safelyWriteNumberField(json, "rollingCountCommandRejections", new Func0() { @Override public Long call() { - return threadPoolMetrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED); } }); + json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster json.writeEndObject(); - json.close(); - - return jsonString.getBuffer().toString(); } - static String toJson(final HystrixCollapserMetrics collapserMetrics) throws IOException { + private static void writeCollapserMetrics(final HystrixCollapserMetrics collapserMetrics, JsonGenerator json) throws IOException { HystrixCollapserKey key = collapserMetrics.getCollapserKey(); - StringWriter jsonString = new StringWriter(); - JsonGenerator json = jsonFactory.createJsonGenerator(jsonString); + json.writeStartObject(); json.writeStringField("type", "HystrixCollapser"); @@ -307,19 +391,19 @@ static String toJson(final HystrixCollapserMetrics collapserMetrics) throws IOEx safelyWriteNumberField(json, "rollingCountRequestsBatched", new Func0() { @Override public Long call() { - return collapserMetrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED); + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH); } }); safelyWriteNumberField(json, "rollingCountBatches", new Func0() { @Override public Long call() { - return collapserMetrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_BATCH); + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED); } }); safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { @Override public Long call() { - return collapserMetrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE); + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE); } }); @@ -357,8 +441,14 @@ public Long call() { json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster json.writeEndObject(); - json.close(); + } - return jsonString.getBuffer().toString(); + protected static void safelyWriteNumberField(JsonGenerator json, String name, Func0 metricGenerator) throws IOException { + try { + json.writeNumberField(name, metricGenerator.call()); + } catch (NoSuchFieldError error) { + logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!"); + json.writeNumberField(name, 0L); + } } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixMetric.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixMetric.java similarity index 77% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixMetric.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixMetric.java index 233341ecd..e2f4ef65a 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixMetric.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixMetric.java @@ -13,14 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.reactivesocket.serialize; +package com.netflix.hystrix.serial; +import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import com.fasterxml.jackson.dataformat.cbor.CBORParser; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,24 +27,11 @@ import java.nio.ByteBuffer; public class SerialHystrixMetric { + protected final static JsonFactory jsonFactory = new JsonFactory(); protected final static CBORFactory cborFactory = new CBORFactory(); protected final static ObjectMapper mapper = new ObjectMapper(); protected final static Logger logger = LoggerFactory.getLogger(SerialHystrixMetric.class); - public static Payload toPayload(byte[] byteArray) { - return new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(byteArray); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - } - public static String fromByteBufferToString(ByteBuffer bb) { byte[] byteArray = new byte[bb.remaining()]; bb.get(byteArray); diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixRequestEvents.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixRequestEvents.java similarity index 80% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixRequestEvents.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixRequestEvents.java index 3f177d776..2f442e15c 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixRequestEvents.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixRequestEvents.java @@ -13,16 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.reactivesocket.serialize; +package com.netflix.hystrix.serial; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.ExecutionResult; import com.netflix.hystrix.HystrixEventType; import com.netflix.hystrix.metric.HystrixRequestEvents; -import org.agrona.LangUtil; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.StringWriter; import java.util.List; import java.util.Map; @@ -33,8 +33,34 @@ public static byte[] toBytes(HystrixRequestEvents requestEvents) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = cborFactory.createGenerator(bos); + JsonGenerator cbor = cborFactory.createGenerator(bos); + serializeRequestEvents(requestEvents, cbor); + + retVal = bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return retVal; + } + + public static String toJsonString(HystrixRequestEvents requestEvents) { + StringWriter jsonString = new StringWriter(); + + try { + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + serializeRequestEvents(requestEvents, json); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return jsonString.getBuffer().toString(); + } + + private static void serializeRequestEvents(HystrixRequestEvents requestEvents, JsonGenerator json) { + try { json.writeStartArray(); for (Map.Entry> entry: requestEvents.getExecutionsMappedToLatencies().entrySet()) { @@ -43,13 +69,9 @@ public static byte[] toBytes(HystrixRequestEvents requestEvents) { json.writeEndArray(); json.close(); - - retVal = bos.toByteArray(); } catch (Exception e) { - LangUtil.rethrowUnchecked(e); + throw new RuntimeException(e); } - - return retVal; } private static void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List latencies) throws IOException { diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixUtilization.java b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixUtilization.java similarity index 84% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixUtilization.java rename to hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixUtilization.java index 1018c2fe0..43239faa4 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixUtilization.java +++ b/hystrix-data-stream/src/main/java/com/netflix/hystrix/serial/SerialHystrixUtilization.java @@ -13,22 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.reactivesocket.serialize; +package com.netflix.hystrix.serial; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.dataformat.cbor.CBORParser; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixConfiguration; import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; import com.netflix.hystrix.metric.sample.HystrixUtilization; -import org.agrona.LangUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.StringWriter; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; @@ -43,8 +44,34 @@ public static byte[] toBytes(HystrixUtilization utilization) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = cborFactory.createGenerator(bos); + JsonGenerator cbor = cborFactory.createGenerator(bos); + serializeUtilization(utilization, cbor); + + retVal = bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return retVal; + } + + public static String toJsonString(HystrixUtilization utilization) { + StringWriter jsonString = new StringWriter(); + + try { + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + serializeUtilization(utilization, json); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return jsonString.getBuffer().toString(); + } + + private static void serializeUtilization(HystrixUtilization utilization, JsonGenerator json) { + try { json.writeStartObject(); json.writeStringField("type", "HystrixUtilization"); json.writeObjectFieldStart("commands"); @@ -65,21 +92,17 @@ public static byte[] toBytes(HystrixUtilization utilization) { json.writeEndObject(); json.writeEndObject(); json.close(); - - retVal = bos.toByteArray(); } catch (Exception e) { - LangUtil.rethrowUnchecked(e); + throw new RuntimeException(e); } - - return retVal; } public static HystrixUtilization fromByteBuffer(ByteBuffer bb) { byte[] byteArray = new byte[bb.remaining()]; bb.get(byteArray); - Map commandUtilizationMap = new HashMap<>(); - Map threadPoolUtilizationMap = new HashMap<>(); + Map commandUtilizationMap = new HashMap(); + Map threadPoolUtilizationMap = new HashMap(); try { CBORParser parser = cborFactory.createParser(byteArray); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java b/hystrix-data-stream/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java similarity index 100% rename from hystrix-core/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java rename to hystrix-data-stream/src/test/java/com/netflix/hystrix/config/HystrixConfigurationStreamTest.java diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java b/hystrix-data-stream/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java similarity index 100% rename from hystrix-core/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java rename to hystrix-data-stream/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java b/hystrix-data-stream/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java similarity index 100% rename from hystrix-core/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java rename to hystrix-data-stream/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStreamTest.java b/hystrix-data-stream/src/test/java/com/netflix/hystrix/serial/SerialHystrixRequestEventsTest.java similarity index 91% rename from hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStreamTest.java rename to hystrix-data-stream/src/test/java/com/netflix/hystrix/serial/SerialHystrixRequestEventsTest.java index 7d6774015..86231e35f 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsJsonStreamTest.java +++ b/hystrix-data-stream/src/test/java/com/netflix/hystrix/serial/SerialHystrixRequestEventsTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.requests.stream; +package com.netflix.hystrix.serial; import com.netflix.hystrix.ExecutionResult; import com.netflix.hystrix.HystrixCollapserKey; @@ -34,7 +34,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class HystrixRequestEventsJsonStreamTest { +public class SerialHystrixRequestEventsTest { private static final HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("GROUP"); private static final HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool"); @@ -45,7 +45,7 @@ public class HystrixRequestEventsJsonStreamTest { @Test public void testEmpty() throws IOException { HystrixRequestEvents request = new HystrixRequestEvents(new ArrayList>()); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[]", actual); } @@ -54,7 +54,7 @@ public void testSingleSuccess() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 100, HystrixEventType.SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[100]}]", actual); } @@ -63,7 +63,7 @@ public void testSingleFailureFallbackMissing() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 101, HystrixEventType.FAILURE, HystrixEventType.FALLBACK_MISSING)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_MISSING\"],\"latencies\":[101]}]", actual); } @@ -72,7 +72,7 @@ public void testSingleFailureFallbackSuccess() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 102, HystrixEventType.FAILURE, HystrixEventType.FALLBACK_SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_SUCCESS\"],\"latencies\":[102]}]", actual); } @@ -81,7 +81,7 @@ public void testSingleFailureFallbackRejected() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 103, HystrixEventType.FAILURE, HystrixEventType.FALLBACK_REJECTION)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_REJECTION\"],\"latencies\":[103]}]", actual); } @@ -90,7 +90,7 @@ public void testSingleFailureFallbackFailure() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 104, HystrixEventType.FAILURE, HystrixEventType.FALLBACK_FAILURE)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_FAILURE\"],\"latencies\":[104]}]", actual); } @@ -99,7 +99,7 @@ public void testSingleTimeoutFallbackSuccess() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 105, HystrixEventType.TIMEOUT, HystrixEventType.FALLBACK_SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"TIMEOUT\",\"FALLBACK_SUCCESS\"],\"latencies\":[105]}]", actual); } @@ -108,7 +108,7 @@ public void testSingleSemaphoreRejectedFallbackSuccess() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 1, HystrixEventType.SEMAPHORE_REJECTED, HystrixEventType.FALLBACK_SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SEMAPHORE_REJECTED\",\"FALLBACK_SUCCESS\"],\"latencies\":[1]}]", actual); } @@ -117,7 +117,7 @@ public void testSingleThreadPoolRejectedFallbackSuccess() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 1, HystrixEventType.THREAD_POOL_REJECTED, HystrixEventType.FALLBACK_SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"THREAD_POOL_REJECTED\",\"FALLBACK_SUCCESS\"],\"latencies\":[1]}]", actual); } @@ -126,7 +126,7 @@ public void testSingleShortCircuitedFallbackSuccess() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 1, HystrixEventType.SHORT_CIRCUITED, HystrixEventType.FALLBACK_SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SHORT_CIRCUITED\",\"FALLBACK_SUCCESS\"],\"latencies\":[1]}]", actual); } @@ -135,7 +135,7 @@ public void testSingleBadRequest() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 50, HystrixEventType.BAD_REQUEST)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"BAD_REQUEST\"],\"latencies\":[50]}]", actual); } @@ -147,7 +147,7 @@ public void testTwoSuccessesSameKey() throws IOException { executions.add(foo1); executions.add(foo2); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23,34]}]", actual); } @@ -159,7 +159,7 @@ public void testTwoSuccessesDifferentKey() throws IOException { executions.add(foo1); executions.add(bar1); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertTrue(actual.equals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23]},{\"name\":\"Bar\",\"events\":[\"SUCCESS\"],\"latencies\":[34]}]") || actual.equals("[{\"name\":\"Bar\",\"events\":[\"SUCCESS\"],\"latencies\":[34]},{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23]}]")); } @@ -172,7 +172,7 @@ public void testTwoFailuresSameKey() throws IOException { executions.add(foo1); executions.add(foo2); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_SUCCESS\"],\"latencies\":[56,67]}]", actual); } @@ -186,7 +186,7 @@ public void testTwoSuccessesOneFailureSameKey() throws IOException { executions.add(foo2); executions.add(foo3); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertTrue(actual.equals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[10,11]},{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_SUCCESS\"],\"latencies\":[67]}]") || actual.equals("[{\"name\":\"Foo\",\"events\":[\"FAILURE\",\"FALLBACK_SUCCESS\"],\"latencies\":[67]},{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[10,11]}]")); } @@ -199,7 +199,7 @@ public void testSingleResponseFromCache() throws IOException { executions.add(foo1); executions.add(cachedFoo1); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23],\"cached\":1}]", actual); } @@ -213,7 +213,7 @@ public void testMultipleResponsesFromCache() throws IOException { executions.add(cachedFoo1); executions.add(anotherCachedFoo1); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23],\"cached\":2}]", actual); } @@ -229,7 +229,7 @@ public void testMultipleCacheKeys() throws IOException { executions.add(foo2); executions.add(cachedFoo2); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertTrue(actual.equals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[67],\"cached\":1},{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23],\"cached\":1}]") || actual.equals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[23],\"cached\":1},{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[67],\"cached\":1}]")); } @@ -239,7 +239,7 @@ public void testSingleSuccessMultipleEmits() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 100, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[{\"name\":\"EMIT\",\"count\":3},\"SUCCESS\"],\"latencies\":[100]}]", actual); } @@ -248,7 +248,7 @@ public void testSingleSuccessMultipleEmitsAndFallbackEmits() throws IOException List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 100, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.EMIT, HystrixEventType.FAILURE, HystrixEventType.FALLBACK_EMIT, HystrixEventType.FALLBACK_EMIT, HystrixEventType.FALLBACK_SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[{\"name\":\"EMIT\",\"count\":3},\"FAILURE\",{\"name\":\"FALLBACK_EMIT\",\"count\":2},\"FALLBACK_SUCCESS\"],\"latencies\":[100]}]", actual); } @@ -257,7 +257,7 @@ public void testCollapsedBatchOfOne() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 53, collapserKey, 1, HystrixEventType.SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[53],\"collapsed\":{\"name\":\"FooCollapser\",\"count\":1}}]", actual); } @@ -266,7 +266,7 @@ public void testCollapsedBatchOfSix() throws IOException { List> executions = new ArrayList>(); executions.add(new SimpleExecution(fooKey, 53, collapserKey, 6, HystrixEventType.SUCCESS)); HystrixRequestEvents request = new HystrixRequestEvents(executions); - String actual = HystrixRequestEventsJsonStream.convertRequestToJson(request); + String actual = SerialHystrixRequestEvents.toJsonString(request); assertEquals("[{\"name\":\"Foo\",\"events\":[\"SUCCESS\"],\"latencies\":[53],\"collapsed\":{\"name\":\"FooCollapser\",\"count\":6}}]", actual); } diff --git a/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java b/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java index 70996dcae..7b24a1bff 100644 --- a/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java +++ b/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java @@ -22,11 +22,11 @@ import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; import com.netflix.hystrix.contrib.reactivesocket.EventStreamEnum; import com.netflix.hystrix.contrib.reactivesocket.client.HystrixMetricsReactiveSocketClient; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixConfiguration; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixMetric; -import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization; import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; import com.netflix.hystrix.metric.sample.HystrixUtilization; +import com.netflix.hystrix.serial.SerialHystrixConfiguration; +import com.netflix.hystrix.serial.SerialHystrixMetric; +import com.netflix.hystrix.serial.SerialHystrixUtilization; import io.netty.channel.nio.NioEventLoopGroup; import io.reactivesocket.Payload; import org.reactivestreams.Publisher; diff --git a/settings.gradle b/settings.gradle index a34cb522b..4e0a2758a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,7 +15,8 @@ include 'hystrix-core', \ 'hystrix-contrib/hystrix-reactivesocket-event-stream', \ 'hystrix-contrib/hystrix-javanica', \ 'hystrix-contrib/hystrix-junit', \ -'hystrix-dashboard' +'hystrix-dashboard', \ +'hystrix-data-stream' project(':hystrix-examples-reactivesocket/client').name = 'hystrix-examples-reactivesocket-client' project(':hystrix-examples-reactivesocket/server').name = 'hystrix-examples-reactivesocket-server'