Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce intermediate data streams module #1255

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hystrix-contrib/hystrix-metrics-event-stream/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies {
compile project(':hystrix-core')
compile 'com.fasterxml.jackson.core:jackson-core:2.5.2'
compile project(':hystrix-data-stream')
provided 'javax.servlet:servlet-api:2.5'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-all:1.9.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
* the code below may reference a HystrixEventType that does not exist in hystrix-core. If this happens,
* a j.l.NoSuchFieldError occurs. Since this data is not being generated by hystrix-core, it's safe to count it as 0
* and we should log an error to get users to update their dependency set.
*
* @deprecated Prefer {@link com.netflix.hystrix.metric.consumer.HystrixDashboardStream}
*/
@Deprecated //since 1.5.4
public class HystrixMetricsPoller {

static final Logger logger = LoggerFactory.getLogger(HystrixMetricsPoller.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.hystrix.contrib.sample.stream.HystrixSampleSseServlet;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import com.netflix.hystrix.serial.SerialHystrixDashboardData;
import rx.Observable;
import rx.functions.Func1;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -51,186 +46,45 @@
* </servlet-mapping>
* } </pre>
*/
public class HystrixMetricsStreamServlet extends HttpServlet {
public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet {

private static final long serialVersionUID = -7548505095303313237L;

private static final Logger logger = LoggerFactory.getLogger(HystrixMetricsStreamServlet.class);

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.maxConcurrentConnections", 5);
private static DynamicIntProperty defaultMetricListenerQueueSize = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.defaultMetricListenerQueueSize", 1000);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

private static volatile boolean isDestroyed = false;

/**
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
* shutdown, perhaps from some other serverlet's destroy() method.
*/
public static void shutdown() {
isDestroyed = true;
public HystrixMetricsStreamServlet() {
this(HystrixDashboardStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
}

@Override
public void init() throws ServletException {
isDestroyed = false;

/* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream.concatMap(new Func1<HystrixDashboardStream.DashboardData, Observable<String>>() {
@Override
public Observable<String> call(HystrixDashboardStream.DashboardData dashboardData) {
return Observable.from(SerialHystrixDashboardData.toMultipleJsonStrings(dashboardData));
}
}), pausePollerThreadDelayInMs);
}

/**
* Handle incoming GETs
*/

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (isDestroyed) {
response.sendError(503, "Service has been shut down.");
} else {
handleRequest(request, response);
}
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

/**
* Handle servlet being undeployed by gracefully releasing connections so poller threads stop.
*/

@Override
public void destroy() {
/* set marker so the loops can break out */
isDestroyed = true;
super.destroy();
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

/**
* - maintain an open connection with the client
* - on initial connection send latest data of each requested event type
* - subsequently send all changes for each requested event type
*
* @param request
* @param response
* @throws javax.servlet.ServletException
* @throws java.io.IOException
*/
private void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
/* ensure we aren't allowing more connections than we want */
int numberConnections = concurrentConnections.incrementAndGet();
HystrixMetricsPoller poller = null;
try {
if (numberConnections > maxConcurrentConnections.get()) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxConcurrentConnections.get());
} else {

int delay = 500;
try {
String d = request.getParameter("delay");
if (d != null) {
delay = Math.max(Integer.parseInt(d), 1);
}
} catch (Exception e) {
// ignore if it's not a number
}

/* initialize response */
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

int queueSize = defaultMetricListenerQueueSize.get();

MetricJsonListener jsonListener = new MetricJsonListener(queueSize);
poller = new HystrixMetricsPoller(jsonListener, delay);
// start polling and it will write directly to the output stream
poller.start();
logger.debug("Starting poller");

// we will use a "single-writer" approach where the Servlet thread does all the writing
// by fetching JSON messages from the MetricJsonListener to write them to the output
try {
while (poller.isRunning() && !isDestroyed) {
List<String> jsonMessages = jsonListener.getJsonMetrics();
if (jsonMessages.isEmpty()) {
// https://github.com/Netflix/Hystrix/issues/85 hystrix.stream holds connection open if no metrics
// we send a ping to test the connection so that we'll get an IOException if the client has disconnected
response.getWriter().println("ping: \n");
} else {
for (String json : jsonMessages) {
response.getWriter().println("data: " + json + "\n");
}
}

/* shortcut breaking out of loop if we have been destroyed */
if(isDestroyed) {
break;
}

// after outputting all the messages we will flush the stream
response.flushBuffer();

// explicitly check for client disconnect - PrintWriter does not throw exceptions
if (response.getWriter().checkError()) {
throw new IOException("io error");
}

// now wait the 'delay' time
Thread.sleep(delay);
}
} catch (InterruptedException e) {
poller.shutdown();
logger.debug("InterruptedException. Will stop polling.");
Thread.currentThread().interrupt();
} catch (IOException e) {
poller.shutdown();
// debug instead of error as we expect to get these whenever a client disconnects or network issue occurs
logger.debug("IOException while trying to write (generally caused by client disconnecting). Will stop polling.", e);
} catch (Exception e) {
poller.shutdown();
logger.error("Failed to write Hystrix metrics. Will stop polling.", e);
}
logger.debug("Stopping Turbine stream to connection");
}
} catch (Exception e) {
logger.error("Error initializing servlet for metrics event stream.", e);
} finally {
concurrentConnections.decrementAndGet();
if (poller != null) {
poller.shutdown();
}
}
@Override
protected int incrementAndGetCurrentConcurrentConnections() {
return concurrentConnections.incrementAndGet();
}

/**
* This will be called from another thread so needs to be thread-safe.
* @ThreadSafe
*/
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {

/**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p>
* This is a safety check against a runaway poller causing memory leaks.
*/
private LinkedBlockingQueue<String> jsonMetrics;

public MetricJsonListener(int queueSize) {
jsonMetrics = new LinkedBlockingQueue<String>(queueSize);
}

/**
* Store JSON messages in a queue.
*/
@Override
public void handleJsonMetric(String json) {
jsonMetrics.add(json);
}

/**
* Get all JSON messages in the queue.
*
* @return
*/
public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics);
return metrics;
}
@Override
protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
import java.util.List;
import java.util.Map;

/**
* Stream that converts HystrixRequestEvents into JSON. This isn't needed anymore, as it more straightforward
* to consider serialization completely separately from the domain object stream
*
* @deprecated Instead, prefer mapping your preferred serialization on top of {@link HystrixRequestEventsStream#observe()}.
*/
@Deprecated //since 1.5.4
public class HystrixRequestEventsJsonStream {
private static final JsonFactory jsonFactory = new JsonFactory();

Expand Down
Loading