getSomeCommandInstance() {
- return null;
- }
-}
diff --git a/src/main/java/examples/hystrix/package-info.java b/src/main/java/examples/hystrix/package-info.java
deleted file mode 100644
index 21ac6ab..0000000
--- a/src/main/java/examples/hystrix/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2011-2016 The original author or authors
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Apache License v2.0 which accompanies this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * The Apache License v2.0 is available at
- * http://www.opensource.org/licenses/apache2.0.php
- *
- * You may elect to redistribute this code under either of these licenses.
- */
-
-@Source(translate = false)
-package examples.hystrix;
-
-import io.vertx.docgen.Source;
\ No newline at end of file
diff --git a/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java b/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java
index 2c6a113..e3d1d5c 100644
--- a/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java
+++ b/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java
@@ -40,7 +40,7 @@ public interface CircuitBreaker {
*
* @param name the name
* @param vertx the Vert.x instance
- * @param options the configuration option
+ * @param options the configuration options
* @return the created instance
*/
static CircuitBreaker create(String name, Vertx vertx, CircuitBreakerOptions options) {
@@ -60,35 +60,36 @@ static CircuitBreaker create(String name, Vertx vertx) {
/**
* Closes the circuit breaker. It stops sending events on its state on the event bus.
- * This method is not related to the {@code close} state of the circuit breaker. To set the circuit breaker in the
- * {@code close} state, use {@link #reset()}.
+ *
+ * This method is not related to the {@code closed} state of the circuit breaker. To move the circuit breaker to the
+ * {@code closed} state, use {@link #reset()}.
*/
@Fluent
CircuitBreaker close();
/**
- * Sets a {@link Handler} invoked when the circuit breaker state switches to open.
+ * Sets a {@link Handler} to be invoked when the circuit breaker state switches to open.
*
* @param handler the handler, must not be {@code null}
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker openHandler(Handler handler);
/**
- * Sets a {@link Handler} invoked when the circuit breaker state switches to half-open.
+ * Sets a {@link Handler} to be invoked when the circuit breaker state switches to half-open.
*
* @param handler the handler, must not be {@code null}
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker halfOpenHandler(Handler handler);
/**
- * Sets a {@link Handler} invoked when the circuit breaker state switches to close.
+ * Sets a {@link Handler} to be invoked when the circuit breaker state switches to closed.
*
* @param handler the handler, must not be {@code null}
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker closeHandler(Handler handler);
@@ -97,25 +98,26 @@ static CircuitBreaker create(String name, Vertx vertx) {
* Executes the given operation with the circuit breaker control. The operation is generally calling an
* external system. The operation receives a {@link Promise} object as parameter and must
* call {@link Promise#complete(Object)} when the operation has terminated successfully. The operation must also
- * call {@link Promise#fail(Throwable)} in case of failure.
+ * call {@link Promise#fail(Throwable)} in case of a failure.
*
- * The operation is not invoked if the circuit breaker is open, and the given fallback is called immediately. The
- * circuit breaker also monitor the completion of the operation before a configure timeout. The operation is
- * considered as failed if it does not terminate in time.
+ * The operation is not invoked if the circuit breaker is open, and the given fallback is called instead.
+ * The circuit breaker also monitors whether the operation completes in time. The operation is considered failed
+ * if it does not terminate before the configured timeout.
*
* This method returns a {@link Future} object to retrieve the status and result of the operation, with the status
* being a success or a failure. If the fallback is called, the returned future is successfully completed with the
* value returned from the fallback. If the fallback throws an exception, the returned future is marked as failed.
*
* @param command the operation
- * @param fallback the fallback function. It gets an exception as parameter and returns the fallback result
+ * @param fallback the fallback function; gets an exception as parameter and returns the fallback result
* @param the type of result
- * @return a future object completed when the operation or its fallback completes
+ * @return a future object completed when the operation or the fallback completes
*/
Future executeWithFallback(Handler> command, Function fallback);
/**
- * Same as {@link #executeWithFallback(Handler, Function)} but using the circuit breaker default fallback.
+ * Same as {@link #executeWithFallback(Handler, Function)} but using the circuit breaker
+ * {@linkplain #fallback(Function) default fallback}.
*
* @param command the operation
* @param the type of result
@@ -124,13 +126,13 @@ static CircuitBreaker create(String name, Vertx vertx) {
Future execute(Handler> command);
/**
- * Same as {@link #executeAndReportWithFallback(Promise, Handler, Function)} but using the circuit breaker default
- * fallback.
+ * Same as {@link #executeAndReportWithFallback(Promise, Handler, Function)} but using the circuit breaker
+ * {@linkplain #fallback(Function) default fallback}.
*
* @param resultPromise the promise on which the operation result is reported
* @param command the operation
* @param the type of result
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker executeAndReport(Promise resultPromise, Handler> command);
@@ -139,67 +141,68 @@ static CircuitBreaker create(String name, Vertx vertx) {
* Executes the given operation with the circuit breaker control. The operation is generally calling an
* external system. The operation receives a {@link Promise} object as parameter and must
* call {@link Promise#complete(Object)} when the operation has terminated successfully. The operation must also
- * call {@link Promise#fail(Throwable)} in case of failure.
+ * call {@link Promise#fail(Throwable)} in case of a failure.
*
- * The operation is not invoked if the circuit breaker is open, and the given fallback is called immediately. The
- * circuit breaker also monitor the completion of the operation before a configure timeout. The operation is
- * considered as failed if it does not terminate in time.
+ * The operation is not invoked if the circuit breaker is open, and the given fallback is called instead.
+ * The circuit breaker also monitors whether the operation completes in time. The operation is considered failed
+ * if it does not terminate before the configured timeout.
*
- * Unlike {@link #executeWithFallback(Handler, Function)}, this method does return a {@link Future} object, but
- * let the caller pass a {@link Future} object on which the result is reported. If the fallback is called, the future
+ * Unlike {@link #executeWithFallback(Handler, Function)}, this method does not return a {@link Future} object, but
+ * lets the caller pass a {@link Promise} object on which the result is reported. If the fallback is called, the promise
* is successfully completed with the value returned by the fallback function. If the fallback throws an exception,
- * the future is marked as failed.
+ * the promise is marked as failed.
*
* @param resultPromise the promise on which the operation result is reported
* @param command the operation
- * @param fallback the fallback function. It gets an exception as parameter and returns the fallback result
+ * @param fallback the fallback function; gets an exception as parameter and returns the fallback result
* @param the type of result
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker executeAndReportWithFallback(Promise resultPromise, Handler> command,
Function fallback);
/**
- * Sets a default {@link Function} invoked when the bridge is open to handle the "request", or on failure
- * if {@link CircuitBreakerOptions#isFallbackOnFailure()} is enabled.
+ * Sets a default fallback {@link Function} to be invoked when the circuit breaker is open or when failure
+ * occurs and {@link CircuitBreakerOptions#isFallbackOnFailure()} is enabled.
*
* The function gets the exception as parameter and returns the fallback result.
*
- * @param handler the handler
- * @return the current {@link CircuitBreaker}
+ * @param handler the fallback handler
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker fallback(Function handler);
/**
- * Resets the circuit breaker state (number of failure set to 0 and state set to closed).
+ * Resets the circuit breaker state. The number of recent failures is set to 0 and if the state is half-open,
+ * it is set to closed.
*
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker reset();
/**
- * Explicitly opens the circuit.
+ * Explicitly opens the circuit breaker.
*
- * @return the current {@link CircuitBreaker}
+ * @return this {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker open();
/**
- * @return the current state.
+ * @return the current state of this circuit breaker
*/
CircuitBreakerState state();
/**
- * @return the current number of failures.
+ * @return the current number of recorded failures
*/
long failureCount();
/**
- * @return the name of the circuit breaker.
+ * @return the name of this circuit breaker
*/
@CacheReturn
String name();
@@ -212,7 +215,7 @@ CircuitBreaker executeAndReportWithFallback(Promise resultPromise, Handle
CircuitBreaker retryPolicy(Function retryPolicy);
/**
- * Set a {@link RetryPolicy} which computes a delay before retry execution.
+ * Set a {@link RetryPolicy} which computes a delay before a retry attempt.
*/
@Fluent
CircuitBreaker retryPolicy(RetryPolicy retryPolicy);
diff --git a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java
index 3756679..f5cb8f2 100644
--- a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java
+++ b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java
@@ -20,7 +20,7 @@
import io.vertx.core.json.JsonObject;
/**
- * Circuit breaker configuration options. All time are given in milliseconds.
+ * Circuit breaker configuration options. All time values are in milliseconds.
*
* @author Clement Escoffier
*/
@@ -30,25 +30,25 @@ public class CircuitBreakerOptions {
/**
* Default timeout in milliseconds.
*/
- public static final long DEFAULT_TIMEOUT = 10000L;
+ public static final long DEFAULT_TIMEOUT = 10_000L;
/**
- * Default number of failures.
+ * Default number of failures after which a closed circuit breaker moves to open.
*/
public static final int DEFAULT_MAX_FAILURES = 5;
/**
- * Default value of the fallback on failure property.
+ * Default value of the {@linkplain #isFallbackOnFailure() fallback on failure} property.
*/
public static final boolean DEFAULT_FALLBACK_ON_FAILURE = false;
/**
- * Default time before it attempts to re-close the circuit (half-open state) in milliseconds.
+ * Default time after which an open circuit breaker moves to half-open (in an attempt to re-close) in milliseconds.
*/
- public static final long DEFAULT_RESET_TIMEOUT = 30000;
+ public static final long DEFAULT_RESET_TIMEOUT = 30_000;
/**
- * Whether circuit breaker state should be delivered only to local consumers by default = {@code true}.
+ * Default value of whether circuit breaker state events should be delivered only to local consumers.
*/
public static final boolean DEFAULT_NOTIFICATION_LOCAL_ONLY = true;
@@ -60,15 +60,15 @@ public class CircuitBreakerOptions {
/**
* Default notification period in milliseconds.
*/
- public static final long DEFAULT_NOTIFICATION_PERIOD = 2000;
+ public static final long DEFAULT_NOTIFICATION_PERIOD = 2_000;
/**
- * Default rolling window for metrics in milliseconds.
+ * Default length of rolling window for metrics in milliseconds.
*/
- public static final long DEFAULT_METRICS_ROLLING_WINDOW = 10000;
+ public static final long DEFAULT_METRICS_ROLLING_WINDOW = 10_000;
/**
- * Default number of buckets used for the rolling window.
+ * Default number of buckets used for the metrics rolling window.
*/
public static final int DEFAULT_METRICS_ROLLING_BUCKETS = 10;
@@ -78,9 +78,9 @@ public class CircuitBreakerOptions {
private static final int DEFAULT_MAX_RETRIES = 0;
/**
- * The default rolling window span in milliseconds.
+ * Default length of rolling window for failures in milliseconds.
*/
- private static final int DEFAULT_FAILURES_ROLLING_WINDOW = 10000;
+ private static final int DEFAULT_FAILURES_ROLLING_WINDOW = 10_000;
/**
* The operation timeout.
@@ -93,7 +93,7 @@ public class CircuitBreakerOptions {
private int maxFailures = DEFAULT_MAX_FAILURES;
/**
- * Whether or not the fallback should be called upon failures.
+ * Whether the fallback should be called upon failures.
*/
private boolean fallbackOnFailure = DEFAULT_FALLBACK_ON_FAILURE;
@@ -164,9 +164,9 @@ public CircuitBreakerOptions(CircuitBreakerOptions other) {
}
/**
- * Creates a new instance of {@link CircuitBreakerOptions} from the given json object.
+ * Creates a new instance of {@link CircuitBreakerOptions} from the given JSON object.
*
- * @param json the json object
+ * @param json the JSON object
*/
public CircuitBreakerOptions(JsonObject json) {
this();
@@ -174,7 +174,7 @@ public CircuitBreakerOptions(JsonObject json) {
}
/**
- * @return a json object representing the current configuration.
+ * @return a JSON object representing this configuration
*/
public JsonObject toJson() {
JsonObject json = new JsonObject();
@@ -183,17 +183,17 @@ public JsonObject toJson() {
}
/**
- * @return the maximum number of failures before opening the circuit.
+ * @return the maximum number of failures before opening the circuit breaker
*/
public int getMaxFailures() {
return maxFailures;
}
/**
- * Sets the maximum number of failures before opening the circuit.
+ * Sets the maximum number of failures before opening the circuit breaker.
*
* @param maxFailures the number of failures.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setMaxFailures(int maxFailures) {
this.maxFailures = maxFailures;
@@ -201,18 +201,18 @@ public CircuitBreakerOptions setMaxFailures(int maxFailures) {
}
/**
- * @return the configured timeout in milliseconds.
+ * @return the configured timeout in milliseconds
*/
public long getTimeout() {
return timeout;
}
/**
- * Sets the timeout in milliseconds. If an action is not completed before this timeout, the action is considered as
+ * Sets the timeout in milliseconds. If an action does not complete before this timeout, the action is considered as
* a failure.
*
* @param timeoutInMs the timeout, -1 to disable the timeout
- * @return the current {@link CircuitBreakerOptions} instance
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setTimeout(long timeoutInMs) {
this.timeout = timeoutInMs;
@@ -220,17 +220,17 @@ public CircuitBreakerOptions setTimeout(long timeoutInMs) {
}
/**
- * @return whether or not the fallback is executed on failures, even when the circuit is closed.
+ * @return whether the fallback is executed on failures, even when the circuit breaker is closed
*/
public boolean isFallbackOnFailure() {
return fallbackOnFailure;
}
/**
- * Sets whether or not the fallback is executed on failure, even when the circuit is closed.
+ * Sets whether the fallback is executed on failure, even when the circuit breaker is closed.
*
* @param fallbackOnFailure {@code true} to enable it.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setFallbackOnFailure(boolean fallbackOnFailure) {
this.fallbackOnFailure = fallbackOnFailure;
@@ -238,18 +238,18 @@ public CircuitBreakerOptions setFallbackOnFailure(boolean fallbackOnFailure) {
}
/**
- * @return the time in milliseconds before it attempts to re-close the circuit (by going to the half-open state).
+ * @return the time in milliseconds before an open circuit breaker moves to half-open (in an attempt to re-close)
*/
public long getResetTimeout() {
return resetTimeout;
}
/**
- * Sets the time in ms before it attempts to re-close the circuit (by going to the half-open state). If the circuit
- * is closed when the timeout is reached, nothing happens. {@code -1} disables this feature.
+ * Sets the time in milliseconds before an open circuit breaker moves to half-open (in an attempt to re-close).
+ * If the circuit breaker is closed when the timeout is reached, nothing happens. {@code -1} disables this feature.
*
* @param resetTimeout the time in ms
- * @return the current {@link CircuitBreakerOptions} instance
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setResetTimeout(long resetTimeout) {
this.resetTimeout = resetTimeout;
@@ -257,17 +257,18 @@ public CircuitBreakerOptions setResetTimeout(long resetTimeout) {
}
/**
- * @return {@code true} if circuit breaker state should be delivered only to local consumers, otherwise {@code false}
+ * @return {@code true} if circuit breaker state events should be delivered only to local consumers,
+ * {@code false} otherwise
*/
public boolean isNotificationLocalOnly() {
return notificationLocalOnly;
}
/**
- * Whether circuit breaker state should be delivered only to local consumers.
+ * Sets whether circuit breaker state events should be delivered only to local consumers.
*
- * @param notificationLocalOnly {@code true} if circuit breaker state should be delivered only to local consumers, otherwise {@code false}
- * @return the current {@link CircuitBreakerOptions} instance
+ * @param notificationLocalOnly {@code true} if circuit breaker state events should be delivered only to local consumers, {@code false} otherwise
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setNotificationLocalOnly(boolean notificationLocalOnly) {
this.notificationLocalOnly = notificationLocalOnly;
@@ -275,18 +276,18 @@ public CircuitBreakerOptions setNotificationLocalOnly(boolean notificationLocalO
}
/**
- * @return the eventbus address on which the circuit breaker events are published. {@code null} if this feature has
- * been disabled.
+ * @return the eventbus address on which the circuit breaker events are published, or {@code null} if this feature has
+ * been disabled
*/
public String getNotificationAddress() {
return notificationAddress;
}
/**
- * Sets the event bus address on which the circuit breaker publish its state change.
+ * Sets the event bus address on which the circuit breaker publishes its state changes.
*
- * @param notificationAddress the address, {@code null} to disable this feature.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @param notificationAddress the address, {@code null} to disable this feature
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setNotificationAddress(String notificationAddress) {
this.notificationAddress = notificationAddress;
@@ -294,18 +295,18 @@ public CircuitBreakerOptions setNotificationAddress(String notificationAddress)
}
/**
- * @return the the period in milliseconds where the circuit breaker send a notification about its state.
+ * @return the period in milliseconds in which the circuit breaker sends notifications about its state
*/
public long getNotificationPeriod() {
return notificationPeriod;
}
/**
- * Configures the period in milliseconds where the circuit breaker send a notification on the event bus with its
+ * Sets the period in milliseconds in which the circuit breaker sends notifications on the event bus with its
* current state.
*
* @param notificationPeriod the period, 0 to disable this feature.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setNotificationPeriod(long notificationPeriod) {
this.notificationPeriod = notificationPeriod;
@@ -313,17 +314,17 @@ public CircuitBreakerOptions setNotificationPeriod(long notificationPeriod) {
}
/**
- * @return the configured rolling window for metrics.
+ * @return the configured length of rolling window for metrics
*/
public long getMetricsRollingWindow() {
return metricsRollingWindow;
}
/**
- * Sets the rolling window used for metrics.
+ * Sets the rolling window length used for metrics.
*
- * @param metricsRollingWindow the period in milliseconds.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @param metricsRollingWindow the period in milliseconds
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setMetricsRollingWindow(long metricsRollingWindow) {
this.metricsRollingWindow = metricsRollingWindow;
@@ -331,17 +332,17 @@ public CircuitBreakerOptions setMetricsRollingWindow(long metricsRollingWindow)
}
/**
- * @return the configured rolling window for failures.
+ * @return the configured length of rolling window for failures
*/
public long getFailuresRollingWindow() {
return failuresRollingWindow;
}
/**
- * Sets the rolling window used for metrics.
+ * Sets the rolling window length used for failures.
*
- * @param metricsRollingWindow the period in milliseconds.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @param failureRollingWindow the period in milliseconds
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setFailuresRollingWindow(long failureRollingWindow) {
this.failuresRollingWindow = failureRollingWindow;
@@ -349,21 +350,21 @@ public CircuitBreakerOptions setFailuresRollingWindow(long failureRollingWindow)
}
/**
- * @return the configured number of buckets the rolling window is divided into.
+ * @return the configured number of buckets the metrics rolling window is divided into
*/
public int getMetricsRollingBuckets() {
return metricsRollingBuckets;
}
/**
- * Sets the configured number of buckets the rolling window is divided into.
- *
- * The following must be true - metrics.rollingStats.timeInMilliseconds % metrics.rollingStats.numBuckets == 0 - otherwise it will throw an exception.
- *
- * In other words, 10000/10 is okay, so is 10000/20 but 10000/7 is not.
+ * Sets the number of buckets the metrics rolling window is divided into.
+ *
+ * The following must be true: {@code metricsRollingWindow % metricsRollingBuckets == 0},
+ * otherwise an exception will be thrown.
+ * For example, 10000/10 is okay, so is 10000/20, but 10000/7 is not.
*
- * @param metricsRollingBuckets the number of rolling buckets.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @param metricsRollingBuckets the number of buckets
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setMetricsRollingBuckets(int metricsRollingBuckets) {
this.metricsRollingBuckets = metricsRollingBuckets;
@@ -371,17 +372,17 @@ public CircuitBreakerOptions setMetricsRollingBuckets(int metricsRollingBuckets)
}
/**
- * @return the number of times the circuit breaker tries to redo the operation before failing
+ * @return the number of times the circuit breaker retries an operation before failing
*/
public int getMaxRetries() {
return maxRetries;
}
/**
- * Configures the number of times the circuit breaker tries to redo the operation before failing.
+ * Sets the number of times the circuit breaker retries an operation before failing.
*
- * @param maxRetries the number of retries, 0 to disable this feature.
- * @return the current {@link CircuitBreakerOptions} instance
+ * @param maxRetries the number of retries, 0 to disable retrying
+ * @return this {@link CircuitBreakerOptions}
*/
public CircuitBreakerOptions setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
diff --git a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java
index e4de4df..15c48cc 100644
--- a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java
+++ b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java
@@ -31,13 +31,13 @@ public enum CircuitBreakerState {
*/
OPEN,
/**
- * The {@code CLOSED} state. The circuit breaker lets invocations pass and collects the failures. IF the number of
- * failures reach the specified threshold, the cricuit breaker switches to the {@link #OPEN} state.
+ * The {@code CLOSED} state. The circuit breaker lets invocations pass and collects the failures. If the number of
+ * failures reach the specified threshold, the circuit breaker switches to the {@link #OPEN} state.
*/
CLOSED,
/**
* The {@code HALF_OPEN} state. The circuit breaker has been opened, and is now checking the current situation. It
- * lets pass the next invocation and determines from the result (failure or success) if the circuit breaker can
+ * lets the next invocation pass and determines from the result (failure or success) if the circuit breaker can
* be switched to the {@link #CLOSED} state again.
*/
HALF_OPEN
diff --git a/src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java b/src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java
deleted file mode 100644
index 89a3560..0000000
--- a/src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package io.vertx.circuitbreaker;
-
-import io.vertx.circuitbreaker.impl.HystrixMetricEventStream;
-import io.vertx.codegen.annotations.VertxGen;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
-import io.vertx.ext.web.RoutingContext;
-
-/**
- * A Vert.x web handler to expose the circuit breaker to the Hystrix dasbboard. The handler listens to the circuit
- * breaker notifications sent on the event bus.
- *
- * @author Clement Escoffier
- */
-@VertxGen
-public interface HystrixMetricHandler extends Handler {
-
- /**
- * Creates the handler, using the default notification address and listening to local messages only.
- *
- * @param vertx the Vert.x instance
- * @return the handler
- */
- static HystrixMetricHandler create(Vertx vertx) {
- return create(vertx, CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
- }
-
- /**
- * Creates the handler, listening only to local messages.
- *
- * @param vertx the Vert.x instance
- * @param address the address to listen on the event bus
- * @return the handler
- */
- static HystrixMetricHandler create(Vertx vertx, String address) {
- return create(vertx, address, CircuitBreakerOptions.DEFAULT_NOTIFICATION_LOCAL_ONLY);
- }
-
- /**
- * Creates the handler.
- *
- * @param vertx the Vert.x instance
- * @param address the address to listen on the event bus
- * @param localOnly whether the consumer should only receive messages sent from this Vert.x instance
- * @return the handler
- */
- static HystrixMetricHandler create(Vertx vertx, String address, boolean localOnly) {
- return new HystrixMetricEventStream(vertx, address, localOnly);
- }
-}
diff --git a/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java b/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java
index 4c87d8d..805e884 100644
--- a/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java
+++ b/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java
@@ -50,7 +50,8 @@ static RetryPolicy linearDelay(long initialDelay, long maxDelay) {
/**
* Create an exponential delay with jitter retry policy.
*
- * Based on Full Jitter in Exponential Backoff And Jitter.
+ * Based on the Full Jitter approach described in
+ * Exponential Backoff And Jitter.
*
* @param initialDelay the initial delay in milliseconds
* @param maxDelay maximum delay in milliseconds
@@ -68,7 +69,7 @@ static RetryPolicy exponentialDelayWithJitter(long initialDelay, long maxDelay)
/**
* Compute a delay in milliseconds before retry is executed.
*
- * @param failure the failure passed to the operation {@link io.vertx.core.Promise}
+ * @param failure the failure of the previous execution attempt
* @param retryCount the number of times operation has been retried already
* @return a delay in milliseconds before retry is executed
*/
diff --git a/src/main/java/io/vertx/circuitbreaker/TimeoutException.java b/src/main/java/io/vertx/circuitbreaker/TimeoutException.java
index 1ad8c2a..b294d3b 100644
--- a/src/main/java/io/vertx/circuitbreaker/TimeoutException.java
+++ b/src/main/java/io/vertx/circuitbreaker/TimeoutException.java
@@ -4,7 +4,7 @@
* Exception reported when the monitored operation timed out.
*
* For performance reason, this exception does not carry a stack trace. You are not allowed to set a stack trace or a
- * cause to this exception. This immutability allows using a singleton instance.
+ * cause to this exception. This immutability allows using a singleton instance.
*
* @author Clement Escoffier
*/
diff --git a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java
index a541e59..fe29689 100644
--- a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java
+++ b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java
@@ -22,15 +22,14 @@
import io.vertx.circuitbreaker.OpenCircuitException;
import io.vertx.circuitbreaker.RetryPolicy;
import io.vertx.circuitbreaker.TimeoutException;
-import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
+import io.vertx.core.impl.ContextInternal;
import io.vertx.core.json.JsonObject;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
@@ -133,11 +132,12 @@ public CircuitBreaker fallback(Function handler) {
}
/**
- * A version of reset that can force the the state to `close` even if the circuit breaker is open. This is an
- * internal API.
+ * A version of {@link #reset()} that can forcefully change the state to closed even if the circuit breaker is open.
+ *
+ * This is an internal API.
*
- * @param force whether or not we force the state and allow an illegal transition
- * @return the current circuit breaker.
+ * @param force whether we force the state change and allow an illegal transition
+ * @return this circuit breaker
*/
public synchronized CircuitBreaker reset(boolean force) {
rollingFailures.reset();
@@ -207,105 +207,97 @@ private synchronized CircuitBreaker attemptReset() {
}
@Override
- public CircuitBreaker executeAndReportWithFallback(
- Promise userFuture,
- Handler> command,
+ public CircuitBreaker executeAndReportWithFallback(Promise resultPromise, Handler> command,
Function fallback) {
- Context context = vertx.getOrCreateContext();
+ ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
CircuitBreakerState currentState;
synchronized (this) {
currentState = state;
}
- CircuitBreakerMetrics.Operation call = metrics != null ? metrics.enqueue() : null;
+ CircuitBreakerMetrics.Operation operationMetrics = metrics != null ? metrics.enqueue() : null;
// this future object tracks the completion of the operation
// This future is marked as failed on operation failures and timeout.
- Promise operationResult = Promise.promise();
+ Promise operationResult = context.promise();
if (currentState == CircuitBreakerState.CLOSED) {
operationResult.future().onComplete(event -> {
- context.runOnContext(v -> {
- if (event.failed()) {
- incrementFailures();
- if (call != null) {
- call.failed();
- }
- if (options.isFallbackOnFailure()) {
- invokeFallback(event.cause(), userFuture, fallback, call);
- } else {
- userFuture.fail(event.cause());
- }
+ if (event.failed()) {
+ incrementFailures();
+ if (operationMetrics != null) {
+ operationMetrics.failed();
+ }
+ if (options.isFallbackOnFailure()) {
+ invokeFallback(event.cause(), resultPromise, fallback, operationMetrics);
} else {
- if (call != null) {
- call.complete();
- }
- reset();
- userFuture.complete(event.result());
+ resultPromise.fail(event.cause());
}
- // Else the operation has been canceled because of a time out.
- });
+ } else {
+ if (operationMetrics != null) {
+ operationMetrics.complete();
+ }
+ reset();
+ resultPromise.complete(event.result());
+ }
+ // Else the operation has been canceled because of a timeout.
});
if (options.getMaxRetries() > 0) {
- executeOperation(context, command, retryFuture(context, 0, command, operationResult, call), call);
+ executeOperation(context, command, retryPromise(context, 0, command, operationResult, operationMetrics), operationMetrics);
} else {
- executeOperation(context, command, operationResult, call);
+ executeOperation(context, command, operationResult, operationMetrics);
}
} else if (currentState == CircuitBreakerState.OPEN) {
// Fallback immediately
- if (call != null) {
- call.shortCircuited();
+ if (operationMetrics != null) {
+ operationMetrics.shortCircuited();
}
- invokeFallback(OpenCircuitException.INSTANCE, userFuture, fallback, call);
+ invokeFallback(OpenCircuitException.INSTANCE, resultPromise, fallback, operationMetrics);
} else if (currentState == CircuitBreakerState.HALF_OPEN) {
if (passed.incrementAndGet() == 1) {
operationResult.future().onComplete(event -> {
- context.runOnContext(v -> {
- if (event.failed()) {
- open();
- if (call != null) {
- call.failed();
- }
- if (options.isFallbackOnFailure()) {
- invokeFallback(event.cause(), userFuture, fallback, call);
- } else {
- userFuture.fail(event.cause());
- }
+ if (event.failed()) {
+ open();
+ if (operationMetrics != null) {
+ operationMetrics.failed();
+ }
+ if (options.isFallbackOnFailure()) {
+ invokeFallback(event.cause(), resultPromise, fallback, operationMetrics);
} else {
- if (call != null) {
- call.complete();
- }
- reset();
- userFuture.complete(event.result());
+ resultPromise.fail(event.cause());
}
- });
+ } else {
+ if (operationMetrics != null) {
+ operationMetrics.complete();
+ }
+ reset();
+ resultPromise.complete(event.result());
+ }
});
// Execute the operation
- executeOperation(context, command, operationResult, call);
+ executeOperation(context, command, operationResult, operationMetrics);
} else {
// Not selected, fallback.
- if (call != null) {
- call.shortCircuited();
+ if (operationMetrics != null) {
+ operationMetrics.shortCircuited();
}
- invokeFallback(OpenCircuitException.INSTANCE, userFuture, fallback, call);
+ invokeFallback(OpenCircuitException.INSTANCE, resultPromise, fallback, operationMetrics);
}
}
return this;
}
- private Promise retryFuture(Context context, int retryCount, Handler> command, Promise
- operationResult, CircuitBreakerMetrics.Operation call) {
- Promise retry = Promise.promise();
+ private Promise retryPromise(ContextInternal context, int retryCount, Handler> command,
+ Promise operationResult, CircuitBreakerMetrics.Operation operationMetrics) {
- retry.future().onComplete(event -> {
+ Promise promise = context.promise();
+ promise.future().onComplete(event -> {
if (event.succeeded()) {
reset();
- context.runOnContext(v -> {
- operationResult.complete(event.result());
- });
+ operationResult.complete(event.result());
return;
}
@@ -316,32 +308,28 @@ private Promise retryFuture(Context context, int retryCount, Handler {
- context.runOnContext(v -> {
- // Don't report timeout or error in the retry attempt, only the last one.
- executeOperation(context, command, retryFuture(context, retryCount + 1, command, operationResult, null),
- call);
- });
+ executeRetryWithDelay(event.cause(), retryCount, l -> {
+ // Don't report timeout or error in the retry attempt, only the last one.
+ executeOperation(context, command, retryPromise(context, retryCount + 1, command, operationResult, null),
+ operationMetrics);
});
} else {
- executeRetryWithTimeout(event.cause(), retryCount, (l) -> {
- context.runOnContext(v -> {
- executeOperation(context, command, operationResult, call);
- });
+ executeRetryWithDelay(event.cause(), retryCount, l -> {
+ executeOperation(context, command, operationResult, operationMetrics);
});
}
} else {
- context.runOnContext(v -> operationResult.fail(OpenCircuitException.INSTANCE));
+ operationResult.fail(OpenCircuitException.INSTANCE);
}
});
- return retry;
+ return promise;
}
- private void executeRetryWithTimeout(Throwable failure, int retryCount, Handler action) {
- long retryTimeout = retryPolicy.delay(failure, retryCount + 1);
+ private void executeRetryWithDelay(Throwable failure, int retryCount, Handler action) {
+ long retryDelay = retryPolicy.delay(failure, retryCount + 1);
- if (retryTimeout > 0) {
- vertx.setTimer(retryTimeout, (l) -> {
+ if (retryDelay > 0) {
+ vertx.setTimer(retryDelay, l -> {
action.handle(null);
});
} else {
@@ -349,82 +337,78 @@ private void executeRetryWithTimeout(Throwable failure, int retryCount, Handler<
}
}
- private void invokeFallback(Throwable reason, Promise userFuture,
- Function fallback, CircuitBreakerMetrics.Operation operation) {
+ private void invokeFallback(Throwable reason, Promise resultPromise,
+ Function fallback, CircuitBreakerMetrics.Operation operationMetrics) {
if (fallback == null) {
// No fallback, mark the user future as failed.
- userFuture.fail(reason);
+ resultPromise.fail(reason);
return;
}
try {
T apply = fallback.apply(reason);
- if (operation != null) {
- operation.fallbackSucceed();
+ if (operationMetrics != null) {
+ operationMetrics.fallbackSucceed();
}
- userFuture.complete(apply);
+ resultPromise.complete(apply);
} catch (Exception e) {
- userFuture.fail(e);
- if (operation != null) {
- operation.fallbackFailed();
+ resultPromise.fail(e);
+ if (operationMetrics != null) {
+ operationMetrics.fallbackFailed();
}
}
}
- private void executeOperation(Context context, Handler> operation, Promise operationResult,
- CircuitBreakerMetrics.Operation call) {
+ private void executeOperation(ContextInternal context, Handler> operation, Promise operationResult,
+ CircuitBreakerMetrics.Operation operationMetrics) {
// We use an intermediate future to avoid the passed future to complete or fail after a timeout.
- Promise passedFuture = Promise.promise();
+ Promise passedFuture = context.promise();
// Execute the operation
if (options.getTimeout() != -1) {
long timerId = vertx.setTimer(options.getTimeout(), (l) -> {
- context.runOnContext(v -> {
- // Check if the operation has not already been completed
- if (!operationResult.future().isComplete()) {
- if (call != null) {
- call.timeout();
- }
- operationResult.fail(TimeoutException.INSTANCE);
+ // Check if the operation has not already been completed
+ if (!operationResult.future().isComplete()) {
+ if (operationMetrics != null) {
+ operationMetrics.timeout();
}
- // Else Operation has completed
- });
+ operationResult.fail(TimeoutException.INSTANCE);
+ }
+ // Else Operation has completed
});
passedFuture.future().onComplete(v -> vertx.cancelTimer(timerId));
}
try {
passedFuture.future().onComplete(ar -> {
- context.runOnContext(v -> {
- if (ar.failed()) {
- if (!operationResult.future().isComplete()) {
- operationResult.fail(ar.cause());
- }
- } else {
- if (!operationResult.future().isComplete()) {
- operationResult.complete(ar.result());
- }
+ if (ar.failed()) {
+ if (!operationResult.future().isComplete()) {
+ operationResult.fail(ar.cause());
}
- });
+ } else {
+ if (!operationResult.future().isComplete()) {
+ operationResult.complete(ar.result());
+ }
+ }
});
operation.handle(passedFuture);
} catch (Throwable e) {
- context.runOnContext(v -> {
- if (!operationResult.future().isComplete()) {
- if (call != null) {
- call.error();
- }
- operationResult.fail(e);
+ if (!operationResult.future().isComplete()) {
+ if (operationMetrics != null) {
+ operationMetrics.error();
}
- });
+ operationResult.fail(e);
+ }
}
}
@Override
public Future executeWithFallback(Handler> operation, Function fallback) {
- Promise future = Promise.promise();
- executeAndReportWithFallback(future, operation, fallback);
- return future.future();
+ // be careful to not create a new context, to preserve existing (sometimes synchronous) behavior
+ ContextInternal context = ContextInternal.current();
+ Promise promise = context != null ? context.promise() : Promise.promise();
+ executeAndReportWithFallback(promise, operation, fallback);
+ return promise.future();
}
public Future execute(Handler> operation) {
@@ -432,8 +416,8 @@ public Future execute(Handler> operation) {
}
@Override
- public CircuitBreaker executeAndReport(Promise resultFuture, Handler> operation) {
- return executeAndReportWithFallback(resultFuture, operation, fallback);
+ public CircuitBreaker executeAndReport(Promise resultPromise, Handler> operation) {
+ return executeAndReportWithFallback(resultPromise, operation, fallback);
}
@Override
@@ -447,8 +431,7 @@ private synchronized void incrementFailures() {
if (state != CircuitBreakerState.OPEN) {
open();
} else {
- // No need to do it in the previous case, open() do it.
- // If open has been called, no need to send update, it will be done by the `open` method.
+ // `open()` calls `sendUpdateOnEventBus()`, so no need to repeat it in the previous case
sendUpdateOnEventBus();
}
} else {
@@ -482,14 +465,22 @@ public CircuitBreaker retryPolicy(RetryPolicy retryPolicy) {
return this;
}
- public static class RollingCounter {
+ static class RollingCounter {
+ // all `RollingCounter` methods are called in a `synchronized (CircuitBreakerImpl.this)` block,
+ // which therefore guards access to these fields
+
private Map window;
private long timeUnitsInWindow;
private TimeUnit windowTimeUnit;
public RollingCounter(long timeUnitsInWindow, TimeUnit windowTimeUnit) {
this.windowTimeUnit = windowTimeUnit;
- this.window = new LinkedHashMap<>((int) timeUnitsInWindow + 1);
+ this.window = new LinkedHashMap((int) timeUnitsInWindow + 1) {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > timeUnitsInWindow;
+ }
+ };
this.timeUnitsInWindow = timeUnitsInWindow;
}
@@ -497,18 +488,18 @@ public void increment() {
long timeSlot = windowTimeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Long current = window.getOrDefault(timeSlot, 0L);
window.put(timeSlot, ++current);
-
- if (window.size() > timeUnitsInWindow) {
- Iterator iterator = window.keySet().iterator();
- if (iterator.hasNext()) {
- window.remove(iterator.next());
- }
- }
}
public long count() {
long windowStartTime = windowTimeUnit.convert(System.currentTimeMillis() - windowTimeUnit.toMillis(timeUnitsInWindow), TimeUnit.MILLISECONDS);
- return window.entrySet().stream().filter(entry -> entry.getKey() >= windowStartTime).mapToLong(entry -> entry.getValue()).sum();
+
+ long result = 0;
+ for (Map.Entry entry : window.entrySet()) {
+ if (entry.getKey() >= windowStartTime) {
+ result += entry.getValue();
+ }
+ }
+ return result;
}
public void reset() {
diff --git a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java
index 691f9cf..ce6fb0e 100644
--- a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java
+++ b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java
@@ -318,9 +318,9 @@ public void add(Summary other) {
failures += other.failures;
exceptions += other.exceptions;
timeouts += other.timeouts;
- fallbackSuccess += other.fallbackSuccess ;
- fallbackFailure += other.fallbackFailure ;
- shortCircuited += other.shortCircuited ;
+ fallbackSuccess += other.fallbackSuccess;
+ fallbackFailure += other.fallbackFailure;
+ shortCircuited += other.shortCircuited;
}
public void add(Operation operation) {
diff --git a/src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java b/src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java
deleted file mode 100644
index ccda096..0000000
--- a/src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package io.vertx.circuitbreaker.impl;
-
-import io.vertx.circuitbreaker.CircuitBreakerState;
-import io.vertx.circuitbreaker.HystrixMetricHandler;
-import io.vertx.core.Vertx;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.MessageConsumer;
-import io.vertx.core.http.HttpHeaders;
-import io.vertx.core.http.HttpServerResponse;
-import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.RoutingContext;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/**
- * Implements a handler to serve the Vert.x circuit breaker metrics as a Hystrix circuit
- * breaker.
- *
- * @author Clement Escoffier
- */
-public class HystrixMetricEventStream implements HystrixMetricHandler {
-
- private final List connections = Collections.synchronizedList(new LinkedList<>());
- private AtomicInteger counter = new AtomicInteger();
-
- public HystrixMetricEventStream(Vertx vertx, String address, boolean localOnly) {
- Objects.requireNonNull(vertx);
- Objects.requireNonNull(address);
-
- EventBus eventBus = vertx.eventBus();
- MessageConsumer consumer = localOnly ? eventBus.localConsumer(address) : eventBus.consumer(address);
- consumer
- .handler(message -> {
- JsonObject json = build(message.body());
- int id = counter.incrementAndGet();
- String chunk = json.encode() + "\n\n";
- connections.forEach(resp -> {
- try {
- resp.write("id" + ": " + id + "\n");
- resp.write("data:" + chunk);
- } catch (IllegalStateException e) {
- // Connection close.
- }
- });
- });
- }
-
- private JsonObject build(JsonObject body) {
- String state = body.getString("state");
- JsonObject json = new JsonObject();
- json.put("type", "HystrixCommand");
- json.put("name", body.getString("name"));
- json.put("group", body.getString("node"));
- json.put("currentTime", System.currentTimeMillis());
- json.put("isCircuitBreakerOpen", state.equalsIgnoreCase(CircuitBreakerState.OPEN.toString()));
- json.put("errorPercentage", body.getInteger("rollingErrorPercentage", 0));
- json.put("errorCount", body.getInteger("rollingErrorCount", 0));
- json.put("requestCount", body.getInteger("rollingOperationCount", 0));
- json.put("rollingCountCollapsedRequests", 0);
- json.put("rollingCountExceptionsThrown", body.getInteger("rollingExceptionCount", 0));
- json.put("rollingCountFailure", body.getInteger("rollingFailureCount", 0));
- json.put("rollingCountTimeout", body.getInteger("rollingTimeoutCount", 0));
- json.put("rollingCountFallbackFailure", body.getInteger("rollingFallbackFailureCount", 0));
- json.put("rollingCountFallbackRejection", body.getInteger("fallbackRejection", 0));
- json.put("rollingCountFallbackSuccess", body.getInteger("rollingFallbackSuccessCount", 0));
- json.put("rollingCountResponsesFromCache", 0);
- json.put("rollingCountSemaphoreRejected", 0);
- json.put("rollingCountShortCircuited", body.getInteger("rollingShortCircuitedCount", 0));
- json.put("rollingCountSuccess", body.getInteger("rollingSuccessCount", 0));
- json.put("rollingCountThreadPoolRejected", 0);
- json.put("rollingCountTimeout", body.getInteger("rollingTimeoutCount", 0));
- json.put("rollingCountBadRequests", 0);
- json.put("rollingCountEmit", 0);
- json.put("rollingCountFallbackEmit", 0);
- json.put("rollingCountFallbackMissing", 0);
- json.put("rollingMaxConcurrentExecutionCount", 0);
- json.put("currentConcurrentExecutionCount", 0);
- json.put("latencyExecute_mean", body.getInteger("rollingLatencyMean", 0));
- json.put("latencyExecute", body.getJsonObject("rollingLatency", new JsonObject()));
- json.put("latencyTotal_mean", body.getInteger("totalLatencyMean", 0));
- json.put("latencyTotal", body.getJsonObject("totalLatency", new JsonObject()));
-
- json.put("propertyValue_circuitBreakerRequestVolumeThreshold", 0);
- json.put("propertyValue_circuitBreakerSleepWindowInMilliseconds", body.getLong("resetTimeout", 0L));
- json.put("propertyValue_circuitBreakerErrorThresholdPercentage", 0);
- json.put("propertyValue_circuitBreakerForceOpen", false);
- json.put("propertyValue_circuitBreakerForceClosed", false);
- json.put("propertyValue_circuitBreakerEnabled", true);
- json.put("propertyValue_executionIsolationStrategy", "THREAD");
- json.put("propertyValue_executionIsolationThreadTimeoutInMilliseconds", body.getLong("timeout", 0L));
- json.put("propertyValue_executionIsolationThreadInterruptOnTimeout", true);
- json.put("propertyValue_executionIsolationThreadPoolKeyOverride", "");
- json.put("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", 0);
- json.put("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", 0);
- json.put("propertyValue_metricsRollingStatisticalWindowInMilliseconds", body.getLong("metricRollingWindow", 0L));
- json.put("propertyValue_requestCacheEnabled", false);
- json.put("propertyValue_requestLogEnabled", false);
- json.put("reportingHosts", 1);
- return json;
- }
-
- @Override
- public void handle(RoutingContext rc) {
- HttpServerResponse response = rc.response();
- response
- .setChunked(true)
- .putHeader(HttpHeaders.CONTENT_TYPE, "text/event-stream")
- .putHeader(HttpHeaders.CACHE_CONTROL, "no-cache")
- .putHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
-
- rc.request().connection()
- .closeHandler(v -> {
- connections.remove(response);
- endQuietly(response);
- })
- .exceptionHandler(t -> {
- connections.remove(response);
- rc.fail(t);
- });
-
- connections.add(response);
- }
-
- private static void endQuietly(HttpServerResponse response) {
- if (response.ended()) {
- return;
- }
- try {
- response.end();
- } catch (IllegalStateException e) {
- // Ignore it.
- }
- }
-}
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/APITest.java b/src/test/java/io/vertx/circuitbreaker/impl/APITest.java
index 26d6eff..7f03b3e 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/APITest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/APITest.java
@@ -28,7 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static com.jayway.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.core.Is.is;
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java
index 44de07c..11c509d 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java
@@ -37,7 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
-import static com.jayway.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.core.Is.is;
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java
index 9d28421..663563b 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java
@@ -21,7 +21,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
-import static com.jayway.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.await;
import static io.vertx.circuitbreaker.asserts.Assertions.assertThat;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java
index 89b6678..b0e9fce 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java
@@ -42,7 +42,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static com.jayway.awaitility.Awaitility.*;
+import static org.awaitility.Awaitility.*;
import static io.vertx.core.http.HttpHeaders.*;
import static java.util.concurrent.TimeUnit.*;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java b/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java
index afb78a7..5bf1841 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java
@@ -10,7 +10,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
-import static com.jayway.awaitility.Awaitility.*;
+import static org.awaitility.Awaitility.*;
import static org.hamcrest.Matchers.*;
/**
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java b/src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java
deleted file mode 100644
index 92215ed..0000000
--- a/src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (c) 2011-2016 The original author or authors
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Apache License v2.0 which accompanies this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * The Apache License v2.0 is available at
- * http://www.opensource.org/licenses/apache2.0.php
- *
- * You may elect to redistribute this code under either of these licenses.
- */
-
-package io.vertx.circuitbreaker.impl;
-
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import io.vertx.core.Handler;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpClientRequest;
-import io.vertx.core.http.HttpClientResponse;
-import io.vertx.core.http.HttpMethod;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * @author Clement Escoffier
- */
-public class HttpClientCommand extends HystrixCommand {
-
- private final HttpClient client;
- private final String path;
-
- public HttpClientCommand(HttpClient client, String path) {
- super(HystrixCommandGroupKey.Factory.asKey("test"));
- this.client = client;
- this.path = path;
- }
-
- @Override
- protected String run() throws Exception {
- AtomicReference result = new AtomicReference<>();
- CountDownLatch latch = new CountDownLatch(1);
-
- Handler errorHandler = t -> {
- latch.countDown();
- };
-
- client.request(HttpMethod.GET, path).onComplete(ar1 -> {
- if (ar1.succeeded()) {
- HttpClientRequest req = ar1.result();
- req.send().onComplete(ar2 -> {
- if (ar2.succeeded()) {
- HttpClientResponse response = ar2.result();
- response.exceptionHandler(errorHandler);
- if (response.statusCode() != 200) {
- latch.countDown();
- return;
- }
- response.bodyHandler(content -> {
- result.set(content.toString());
- latch.countDown();
- });
- } else {
- errorHandler.handle(ar2.cause());
- }
- });
- } else {
- errorHandler.handle(ar1.cause());
- }
- });
-
- latch.await();
-
- if (result.get() == null) {
- throw new RuntimeException("Failed to retrieve the HTTP response");
- } else {
- return result.get();
- }
- }
-
- @Override
- protected String getFallback() {
- return "fallback";
- }
-}
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java b/src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java
deleted file mode 100644
index 6c62a58..0000000
--- a/src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java
+++ /dev/null
@@ -1,209 +0,0 @@
-package io.vertx.circuitbreaker.impl;
-
-import io.vertx.circuitbreaker.CircuitBreaker;
-import io.vertx.circuitbreaker.CircuitBreakerOptions;
-import io.vertx.circuitbreaker.HystrixMetricHandler;
-import io.vertx.core.Handler;
-import io.vertx.core.Promise;
-import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpClientRequest;
-import io.vertx.core.http.HttpClientResponse;
-import io.vertx.core.http.HttpMethod;
-import io.vertx.core.json.JsonObject;
-import io.vertx.core.parsetools.JsonParser;
-import io.vertx.core.parsetools.RecordParser;
-import io.vertx.ext.unit.TestContext;
-import io.vertx.ext.unit.junit.Repeat;
-import io.vertx.ext.unit.junit.RepeatRule;
-import io.vertx.ext.unit.junit.VertxUnitRunner;
-import io.vertx.ext.web.Router;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.jayway.awaitility.Awaitility.await;
-import static io.vertx.circuitbreaker.CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.core.Is.is;
-
-/**
- * @author Clement Escoffier
- */
-@RunWith(VertxUnitRunner.class)
-public class HystrixMetricEventStreamTest {
-
- @Rule
- public RepeatRule rule = new RepeatRule();
-
-
- private CircuitBreaker breakerA;
- private CircuitBreaker breakerB;
- private CircuitBreaker breakerC;
-
-
- private Vertx vertx;
-
- @Before
- public void setUp(TestContext tc) {
- vertx = Vertx.vertx();
- vertx.exceptionHandler(tc.exceptionHandler());
- }
-
- @After
- public void tearDown() {
- vertx.exceptionHandler(null);
-
- if (breakerA != null) {
- breakerA.close();
- }
- if (breakerB != null) {
- breakerB.close();
- }
- if (breakerC != null) {
- breakerC.close();
- }
-
- AtomicBoolean completed = new AtomicBoolean();
- vertx.close().onComplete(ar -> completed.set(ar.succeeded()));
- await().untilAtomic(completed, is(true));
- }
-
-
- @Test
- @Repeat(10)
- public void test() {
- CircuitBreakerOptions options = new CircuitBreakerOptions()
- .setNotificationAddress(DEFAULT_NOTIFICATION_ADDRESS)
- .setTimeout(1000);
- breakerA = CircuitBreaker.create("A", vertx, new CircuitBreakerOptions(options));
- breakerB = CircuitBreaker.create("B", vertx, new CircuitBreakerOptions(options));
- breakerC = CircuitBreaker.create("C", vertx, new CircuitBreakerOptions(options));
-
- Router router = Router.router(vertx);
- router.get("/metrics").handler(HystrixMetricHandler.create(vertx));
-
- AtomicBoolean ready = new AtomicBoolean();
- vertx.createHttpServer()
- .requestHandler(router)
- .listen(8080).onComplete(ar -> ready.set(ar.succeeded()));
-
- await().untilAtomic(ready, is(true));
-
- List responses = new CopyOnWriteArrayList<>();
- HttpClient client = vertx.createHttpClient();
-
- JsonParser jp = JsonParser.newParser().objectValueMode().handler(
- jsonEvent -> responses.add(jsonEvent.objectValue())
- );
- RecordParser parser = RecordParser.newDelimited("\n\n", buffer -> {
- String record = buffer.toString();
- String[] lines = record.split("\n");
- for (String line : lines) {
- String l = line.trim();
- if (l.startsWith("data:")) {
- String json = l.substring("data:".length());
- jp.handle(Buffer.buffer(json));
- }
- }
- });
-
- client.request(HttpMethod.GET, 8080, "localhost", "/metrics").onComplete(ar1 -> {
- if (ar1.succeeded()) {
- HttpClientRequest req = ar1.result();
- req.send().onComplete(ar2 -> {
- if (ar2.succeeded()) {
- HttpClientResponse resp = ar2.result();
- resp.handler(parser);
- }
- });
- }
- });
-
- for (int i = 0; i < 1000; i++) {
- breakerA.execute(choose());
- breakerB.execute(choose());
- breakerC.execute(choose());
- }
-
- await().atMost(1, TimeUnit.MINUTES).until(() -> responses.size() > 50);
-
- // Check that we got metrics for A, B and C
- JsonObject a = null;
- JsonObject b = null;
- JsonObject c = null;
- for (JsonObject json : responses) {
- switch (json.getString("name")) {
- case "A":
- a = json;
- break;
- case "B":
- b = json;
- break;
- case "C":
- c = json;
- break;
- }
- }
-
- client.close();
-
- assertThat(a).isNotNull();
- assertThat(b).isNotNull();
- assertThat(c).isNotNull();
- }
-
-
- private Random random = new Random();
-
- private Handler> choose() {
- int choice = random.nextInt(5);
- switch (choice) {
- case 0:
- return commandThatWorks();
- case 1:
- return commandThatFails();
- case 2:
- return commandThatCrashes();
- case 3:
- return commandThatTimeout(1000);
- case 4:
- return commandThatTimeoutAndFail(1000);
- }
- return commandThatWorks();
- }
-
-
- private Handler> commandThatWorks() {
- return (future -> vertx.setTimer(5, l -> future.complete(null)));
- }
-
- private Handler> commandThatFails() {
- return (future -> vertx.setTimer(5, l -> future.fail("expected failure")));
- }
-
- private Handler> commandThatCrashes() {
- return (future -> {
- throw new RuntimeException("Expected error");
- });
- }
-
- private Handler> commandThatTimeout(int timeout) {
- return (future -> vertx.setTimer(timeout + 500, l -> future.complete(null)));
- }
-
- private Handler> commandThatTimeoutAndFail(int timeout) {
- return (future -> vertx.setTimer(timeout + 500, l -> future.fail("late failure")));
- }
-
-
-}
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java b/src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java
deleted file mode 100644
index be77619..0000000
--- a/src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Copyright (c) 2011-2016 The original author or authors
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Apache License v2.0 which accompanies this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * The Apache License v2.0 is available at
- * http://www.opensource.org/licenses/apache2.0.php
- *
- * You may elect to redistribute this code under either of these licenses.
- */
-
-package io.vertx.circuitbreaker.impl;
-
-import io.vertx.core.Context;
-import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpClientOptions;
-import io.vertx.core.http.HttpMethod;
-import io.vertx.core.http.HttpServer;
-import io.vertx.ext.web.Router;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.jayway.awaitility.Awaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.core.Is.is;
-
-/**
- * Some test to demonstrate how Hystrix can be used.
- *
- * @author Clement Escoffier
- */
-public class HystrixTest {
-
- private Vertx vertx;
- private HttpServer http;
- private HttpClient client;
-
- @Before
- public void setUp() {
- vertx = Vertx.vertx();
- Router router = Router.router(vertx);
- router.route(HttpMethod.GET, "/").handler(ctxt -> {
- ctxt.response().setStatusCode(200).end("hello");
- });
- router.route(HttpMethod.GET, "/error").handler(ctxt -> {
- ctxt.response().setStatusCode(500).end("failed !");
- });
- router.route(HttpMethod.GET, "/long").handler(ctxt -> {
- try {
- Thread.sleep(2000);
- } catch (Exception e) {
- // Ignored.
- }
- ctxt.response().setStatusCode(200).end("hello");
- });
-
- AtomicBoolean done = new AtomicBoolean();
- vertx.createHttpServer().requestHandler(router).listen(8080).onComplete(ar -> {
- done.set(ar.succeeded());
- http = ar.result();
- });
-
- await().untilAtomic(done, is(true));
-
- client = vertx.createHttpClient(new HttpClientOptions().setDefaultPort(8080).setDefaultHost("localhost"));
- }
-
- @After
- public void tearDown() {
- AtomicBoolean completed = new AtomicBoolean();
- http.close().onComplete(ar -> {
- completed.set(true);
- });
- await().untilAtomic(completed, is(true));
-
- completed.set(false);
- vertx.close().onComplete(v -> completed.set(true));
- await().untilAtomic(completed, is(true));
-
- client.close();
- }
-
- @Test
- public void testOk() throws Exception {
- AtomicReference result = new AtomicReference<>();
-
- vertx.runOnContext(v -> {
-
- // Blocking call..., need to run in an executeBlocking
- vertx.executeBlocking(
- () -> {
- HttpClientCommand command = new HttpClientCommand(client, "/");
- return command.execute();
- }).onComplete(ar -> result.set(ar.result())
- );
- });
-
- await().until(() -> result.get() != null);
- assertThat(result.get()).isEqualToIgnoringCase("hello");
-
- result.set(null);
- vertx.runOnContext(v -> {
-
- // Blocking call..., need to run in an executeBlocking
- vertx.executeBlocking(
- () -> {
- HttpClientCommand command = new HttpClientCommand(client, "/");
- return command.queue().get();
- }).onComplete(ar -> result.set(ar.result())
- );
- });
-
- await().until(() -> result.get() != null);
- assertThat(result.get()).isEqualToIgnoringCase("hello");
- }
-
- @Test
- public void testFailure() throws Exception {
- AtomicReference result = new AtomicReference<>();
-
- vertx.runOnContext(v -> {
-
- // Blocking call..., need to run in an executeBlocking
-
- vertx.executeBlocking(
- () -> {
- HttpClientCommand command = new HttpClientCommand(client, "/error");
- return command.execute();
- }).onComplete(ar -> result.set(ar.result())
- );
- });
-
- await().until(() -> result.get() != null);
- assertThat(result.get()).isEqualToIgnoringCase("fallback");
-
- result.set(null);
- vertx.runOnContext(v -> {
-
- // Blocking call..., need to run in an executeBlocking
- vertx.executeBlocking(
- () -> {
- HttpClientCommand command = new HttpClientCommand(client, "/error");
- return command.queue().get();
- }).onComplete(ar -> result.set(ar.result())
- );
- });
-
- await().until(() -> result.get() != null);
- assertThat(result.get()).isEqualToIgnoringCase("fallback");
- }
-
- @Test
- public void testObservable() throws Exception {
- AtomicReference result = new AtomicReference<>();
-
- vertx.runOnContext(v -> {
- Context context = vertx.getOrCreateContext();
- HttpClientCommand command = new HttpClientCommand(client, "/");
- command.observe().subscribe(s -> {
- context.runOnContext(v2 -> checkSetter(result, s));
- });
- });
-
- await().until(() -> result.get() != null);
- assertThat(result.get()).isEqualToIgnoringCase("hello");
- }
-
- private void checkSetter(AtomicReference ref, String value) {
- if (Context.isOnEventLoopThread()) {
- ref.set(value);
- } else {
- ref.set("Not on the event loop");
- }
- }
-}
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java b/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java
index 7970ec8..f87c78c 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java
@@ -9,7 +9,7 @@
import java.util.concurrent.atomic.AtomicInteger;
-import static com.jayway.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.is;
/**
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java b/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java
index 87b750d..8912b33 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java
@@ -10,7 +10,7 @@
import java.util.concurrent.atomic.AtomicInteger;
-import static com.jayway.awaitility.Awaitility.*;
+import static org.awaitility.Awaitility.*;
import static org.hamcrest.Matchers.*;
/**
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
index 3a44b03..91b921a 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
@@ -31,7 +31,7 @@
import java.util.concurrent.atomic.AtomicReference;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
-import static com.jayway.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
diff --git a/src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java b/src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java
deleted file mode 100644
index f046a98..0000000
--- a/src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package io.vertx.circuitbreaker.metrics;
-
-import io.vertx.circuitbreaker.CircuitBreaker;
-import io.vertx.circuitbreaker.CircuitBreakerOptions;
-import io.vertx.circuitbreaker.HystrixMetricHandler;
-import io.vertx.core.Handler;
-import io.vertx.core.Promise;
-import io.vertx.core.Vertx;
-import io.vertx.ext.web.Router;
-import io.vertx.ext.web.RoutingContext;
-
-import java.util.Random;
-
-import static io.vertx.circuitbreaker.CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS;
-
-/**
- * @author Clement Escoffier
- */
-public class DashboardExample {
-
- private static Random random = new Random();
-
- public static void main(String[] args) {
- Vertx vertx = Vertx.vertx();
- CircuitBreakerOptions options = new CircuitBreakerOptions()
- .setNotificationAddress(DEFAULT_NOTIFICATION_ADDRESS)
- .setFallbackOnFailure(true)
- .setMaxFailures(10)
- .setResetTimeout(5000)
- .setTimeout(1000)
- .setMetricsRollingWindow(10000);
-
- CircuitBreaker cba = CircuitBreaker.create("A", vertx, new CircuitBreakerOptions(options));
- CircuitBreaker cbb = CircuitBreaker.create("B", vertx, new CircuitBreakerOptions(options));
- CircuitBreaker cbc = CircuitBreaker.create("C", vertx, new CircuitBreakerOptions(options));
-
- Router router = Router.router(vertx);
- router.get("/metrics").handler(HystrixMetricHandler.create(vertx));
-
-
- router.get("/A").handler(rc -> a(rc, cba));
- router.get("/B").handler(rc -> b(rc, cbb));
- router.get("/C").handler(rc -> c(rc, cbc));
-
- vertx.createHttpServer()
- .requestHandler(router)
- .listen(8080);
- }
-
-
- private static void a(RoutingContext rc, CircuitBreaker cb) {
- int choice = random.nextInt(10);
- if (choice < 7) {
- cb.executeWithFallback(
- commandThatWorks(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- } else {
- cb.executeWithFallback(
- commandThatFails(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- }
- }
-
- private static void b(RoutingContext rc, CircuitBreaker cb) {
- int choice = random.nextInt(10);
- if (choice < 5) {
- cb.executeWithFallback(
- commandThatWorks(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- } else if (choice < 7) {
- cb.executeWithFallback(
- commandThatCrashes(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- } else {
- cb.executeWithFallback(
- commandThatFails(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- }
- }
-
- private static void c(RoutingContext rc, CircuitBreaker cb) {
- int choice = random.nextInt(10);
- if (choice < 5) {
- cb.executeWithFallback(
- commandThatWorks(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- } else if (choice < 7) {
- cb.executeWithFallback(
- commandThatTimeout(rc.vertx(), 15000),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- } else {
- cb.executeWithFallback(
- commandThatFails(rc.vertx()),
- (t) -> "OK (fallback)")
- .onComplete(s -> rc.response().end(s.result()));
- }
- }
-
- private static Handler> commandThatWorks(Vertx vertx) {
- return (future -> vertx.setTimer(5, l -> future.complete("OK !")));
- }
-
- private static Handler> commandThatFails(Vertx vertx) {
- return (future -> vertx.setTimer(5, l -> future.fail("expected failure")));
- }
-
- private static Handler> commandThatCrashes(Vertx vertx) {
- return (future -> {
- throw new RuntimeException("Expected error");
- });
- }
-
- private static Handler> commandThatTimeout(Vertx vertx, int timeout) {
- return (future -> vertx.setTimer(timeout + 500, l -> future.complete("Is it too late ?")));
- }
-
- private static Handler> commandThatTimeoutAndFail(Vertx vertx, int timeout) {
- return (future -> vertx.setTimer(timeout + 500, l -> future.fail("late failure")));
- }
-
-}
diff --git a/src/test/java/io/vertx/circuitbreaker/metrics/README.md b/src/test/java/io/vertx/circuitbreaker/metrics/README.md
deleted file mode 100644
index eb0ba3f..0000000
--- a/src/test/java/io/vertx/circuitbreaker/metrics/README.md
+++ /dev/null
@@ -1,11 +0,0 @@
-# Metrics demo
-
-
-These files are a simple demo to illustrate the Hystrix Dashboard.
-
-1. Start the Hystrix Dashboard (we recommend using https://github.com/kennedyoliveira/standalone-hystrix-dashboard)
-2. Start the DashboardExample creating 3 different circuit breakers
-3. On the dashboard, register the metrics endpoint: http://localhost:8080/metrics
-4. Start the RandomClient generating load
-5. The circuit breaker metrics are now published in the dashboard
-
diff --git a/src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java b/src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java
deleted file mode 100644
index 7c67d66..0000000
--- a/src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package io.vertx.circuitbreaker.metrics;
-
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.DeploymentOptions;
-import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpClientRequest;
-import io.vertx.core.http.HttpClientResponse;
-import io.vertx.core.http.HttpMethod;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Clement Escoffier
- */
-public class RandomClient extends AbstractVerticle {
-
- public static void main(String[] args) {
- Vertx vertx = Vertx.vertx();
- vertx.deployVerticle(RandomClient.class.getName(), new DeploymentOptions().setInstances(4));
- }
-
- List paths = new ArrayList<>();
- Random random = new Random();
-
- @Override
- public void start() throws Exception {
- paths.add("/A");
- paths.add("/A");
- paths.add("/B");
- paths.add("/C");
-
- AtomicInteger counter = new AtomicInteger();
- vertx.setPeriodic(500, l -> {
- int index = random.nextInt(paths.size());
- int count = counter.getAndIncrement();
- vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", paths.get(index)).onComplete(ar1 -> {
- if (ar1.succeeded()) {
- HttpClientRequest request = ar1.result();
- request.send().onComplete(ar2 -> {
- if (ar2.succeeded()) {
- HttpClientResponse response = ar2.result();
- System.out.println(this + "[" + count + "] (" + paths.get(index) + ") Response: " + response.statusMessage());
- response.bodyHandler(buffer -> {
- System.out.println(this + "[" + count + "] (" + paths.get(index) + ") Data: " + buffer.toString());
- });
- }
- });
- }
- });
- });
- }
-}