From 28d35bf2413cdfbf50727a41811d69ad9382fc77 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 30 Jun 2023 15:32:25 +0200 Subject: [PATCH 1/6] Remove the Hystrix tests, documentation and dashboard integration --- pom.xml | 46 ++-- src/main/asciidoc/index.adoc | 56 ----- .../java/examples/CircuitBreakerExamples.java | 21 -- .../examples/hystrix/HystrixExamples.java | 61 ----- .../java/examples/hystrix/package-info.java | 20 -- .../circuitbreaker/HystrixMetricHandler.java | 50 ----- .../impl/HystrixMetricEventStream.java | 139 ------------ .../impl/HttpClientCommand.java | 90 -------- .../impl/HystrixMetricEventStreamTest.java | 209 ------------------ .../circuitbreaker/impl/HystrixTest.java | 185 ---------------- .../metrics/DashboardExample.java | 128 ----------- .../io/vertx/circuitbreaker/metrics/README.md | 11 - .../circuitbreaker/metrics/RandomClient.java | 55 ----- 13 files changed, 15 insertions(+), 1056 deletions(-) delete mode 100644 src/main/java/examples/hystrix/HystrixExamples.java delete mode 100644 src/main/java/examples/hystrix/package-info.java delete mode 100644 src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java delete mode 100644 src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java delete mode 100644 src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java delete mode 100644 src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java delete mode 100644 src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java delete mode 100644 src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java delete mode 100644 src/test/java/io/vertx/circuitbreaker/metrics/README.md delete mode 100644 src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java diff --git a/pom.xml b/pom.xml index 3b04e5b..c24bbd4 100644 --- a/pom.xml +++ b/pom.xml @@ -26,8 +26,6 @@ 5.0.0-SNAPSHOT - 1.5.2 - true ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF @@ -44,24 +42,6 @@ - - io.vertx - vertx-hazelcast - test - - - io.vertx - vertx-web - true - - - - org.hdrhistogram - HdrHistogram - 2.1.12 - true - - io.vertx vertx-core @@ -77,21 +57,15 @@ true - + - com.netflix.hystrix - hystrix-core - ${hystrix.version} - provided + org.hdrhistogram + HdrHistogram + 2.1.12 true - - - org.hdrhistogram - HdrHistogram - - + com.jayway.restassured rest-assured @@ -122,10 +96,20 @@ 2.2.1 test + + io.vertx + vertx-hazelcast + test + io.vertx vertx-unit test + + io.vertx + vertx-web + test + diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index dc3978d..aa20c2a 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -184,59 +184,3 @@ The fallback receives a: * {@link io.vertx.circuitbreaker.OpenCircuitException} when the circuit breaker is opened * {@link io.vertx.circuitbreaker.TimeoutException} when the operation timed out - -== Pushing circuit breaker metrics to the Hystrix Dashboard - -Netflix Hystrix comes with a dashboard to present the current state of the circuit breakers. The Vert.x circuit -breakers can publish their metrics in order to be consumed by this Hystrix Dashboard. The Hystrix dashboard requires -a SSE stream sending the metrics. This stream is provided by the -{@link io.vertx.circuitbreaker.HystrixMetricHandler} Vert.x Web Handler: - - -[source,$lang] ----- -{@link examples.CircuitBreakerExamples#example7(io.vertx.core.Vertx)} ----- - -In the Hystrix Dashboard, configure the stream url like: `http://localhost:8080/metrics`. The dashboard now consumes -the metrics from the Vert.x circuit breakers. - -IMPORTANT: The metrics are collected by the Vert.x Web handler using <>. -The feature must be enabled and, if you don't use the default notification address, you need to pass it when creating the metrics handler. - -[language, java] ----- -== Using Netflix Hystrix - -https://github.com/Netflix/Hystrix[Hystrix] provides an implementation of the circuit breaker pattern. You can use -Hystrix with Vert.x instead of this circuit breaker or in combination of. This section describes the tricks -to use Hystrix in a vert.x application. - -First you would need to add the Hystrix dependency to your classpath or build descriptor. Refer to the Hystrix -page for details. Then, you need to isolate the "protected" call in a `Command`. Once you have your command, you -can execute it: - -[source, $lang] -\---- -{@link examples.hystrix.HystrixExamples#exampleHystrix1()} -\---- - -However, the command execution is blocking, so have to call the command execution either in an `executeBlocking` -block or in a worker verticle: - -[source, $lang] -\---- -{@link examples.hystrix.HystrixExamples#exampleHystrix2(io.vertx.core.Vertx)} -\---- - -If you use the async support of Hystrix, be careful that callbacks are not called in a vert.x thread and you have -to keep a reference on the context before the execution (with {@link io.vertx.core.Vertx#getOrCreateContext()}, -and in the callback, switch back to the event loop using -{@link io.vertx.core.Vertx#runOnContext(io.vertx.core.Handler)}. Without this, you are loosing the Vert.x -concurrency model and have to manage the synchronization and ordering yourself: - -[source, $lang] -\---- -{@link examples.hystrix.HystrixExamples#exampleHystrix3(io.vertx.core.Vertx)} -\---- ----- diff --git a/src/main/java/examples/CircuitBreakerExamples.java b/src/main/java/examples/CircuitBreakerExamples.java index 874f962..672aa4a 100644 --- a/src/main/java/examples/CircuitBreakerExamples.java +++ b/src/main/java/examples/CircuitBreakerExamples.java @@ -18,14 +18,12 @@ import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; -import io.vertx.circuitbreaker.HystrixMetricHandler; import io.vertx.circuitbreaker.RetryPolicy; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; -import io.vertx.ext.web.Router; /** * @author Clement Escoffier @@ -186,25 +184,6 @@ public void example6(Vertx vertx) { }); } - public void example7(Vertx vertx) { - // Enable notifications - CircuitBreakerOptions options = new CircuitBreakerOptions() - .setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS); - CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions(options)); - CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx, new CircuitBreakerOptions(options)); - - // Create a Vert.x Web router - Router router = Router.router(vertx); - // Register the metric handler - router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx)); - - // Create the HTTP server using the router to dispatch the requests - vertx.createHttpServer() - .requestHandler(router) - .listen(8080); - - } - public void example8(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000) diff --git a/src/main/java/examples/hystrix/HystrixExamples.java b/src/main/java/examples/hystrix/HystrixExamples.java deleted file mode 100644 index b680f87..0000000 --- a/src/main/java/examples/hystrix/HystrixExamples.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2011-2016 The original author or authors - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ - -package examples.hystrix; - -import com.netflix.hystrix.HystrixCommand; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -/** - * Examples for Hystrix - * - * @author Clement Escoffier - */ -public class HystrixExamples { - - public void exampleHystrix1() { - HystrixCommand someCommand = getSomeCommandInstance(); - String result = someCommand.execute(); - } - - public void exampleHystrix2(Vertx vertx) { - HystrixCommand someCommand = getSomeCommandInstance(); - vertx.executeBlocking( - () -> someCommand.execute()).onComplete(ar -> { - // back on the event loop - String result = ar.result(); - } - ); - } - - public void exampleHystrix3(Vertx vertx) { - vertx.runOnContext(v -> { - Context context = vertx.getOrCreateContext(); - HystrixCommand command = getSomeCommandInstance(); - command.observe().subscribe(result -> { - context.runOnContext(v2 -> { - // Back on context (event loop or worker) - String r = result; - }); - }); - }); - } - - private HystrixCommand getSomeCommandInstance() { - return null; - } -} diff --git a/src/main/java/examples/hystrix/package-info.java b/src/main/java/examples/hystrix/package-info.java deleted file mode 100644 index 21ac6ab..0000000 --- a/src/main/java/examples/hystrix/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2011-2016 The original author or authors - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ - -@Source(translate = false) -package examples.hystrix; - -import io.vertx.docgen.Source; \ No newline at end of file diff --git a/src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java b/src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java deleted file mode 100644 index 89a3560..0000000 --- a/src/main/java/io/vertx/circuitbreaker/HystrixMetricHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package io.vertx.circuitbreaker; - -import io.vertx.circuitbreaker.impl.HystrixMetricEventStream; -import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.ext.web.RoutingContext; - -/** - * A Vert.x web handler to expose the circuit breaker to the Hystrix dasbboard. The handler listens to the circuit - * breaker notifications sent on the event bus. - * - * @author Clement Escoffier - */ -@VertxGen -public interface HystrixMetricHandler extends Handler { - - /** - * Creates the handler, using the default notification address and listening to local messages only. - * - * @param vertx the Vert.x instance - * @return the handler - */ - static HystrixMetricHandler create(Vertx vertx) { - return create(vertx, CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS); - } - - /** - * Creates the handler, listening only to local messages. - * - * @param vertx the Vert.x instance - * @param address the address to listen on the event bus - * @return the handler - */ - static HystrixMetricHandler create(Vertx vertx, String address) { - return create(vertx, address, CircuitBreakerOptions.DEFAULT_NOTIFICATION_LOCAL_ONLY); - } - - /** - * Creates the handler. - * - * @param vertx the Vert.x instance - * @param address the address to listen on the event bus - * @param localOnly whether the consumer should only receive messages sent from this Vert.x instance - * @return the handler - */ - static HystrixMetricHandler create(Vertx vertx, String address, boolean localOnly) { - return new HystrixMetricEventStream(vertx, address, localOnly); - } -} diff --git a/src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java b/src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java deleted file mode 100644 index ccda096..0000000 --- a/src/main/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStream.java +++ /dev/null @@ -1,139 +0,0 @@ -package io.vertx.circuitbreaker.impl; - -import io.vertx.circuitbreaker.CircuitBreakerState; -import io.vertx.circuitbreaker.HystrixMetricHandler; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.MessageConsumer; -import io.vertx.core.http.HttpHeaders; -import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.RoutingContext; - -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * Implements a handler to serve the Vert.x circuit breaker metrics as a Hystrix circuit - * breaker. - * - * @author Clement Escoffier - */ -public class HystrixMetricEventStream implements HystrixMetricHandler { - - private final List connections = Collections.synchronizedList(new LinkedList<>()); - private AtomicInteger counter = new AtomicInteger(); - - public HystrixMetricEventStream(Vertx vertx, String address, boolean localOnly) { - Objects.requireNonNull(vertx); - Objects.requireNonNull(address); - - EventBus eventBus = vertx.eventBus(); - MessageConsumer consumer = localOnly ? eventBus.localConsumer(address) : eventBus.consumer(address); - consumer - .handler(message -> { - JsonObject json = build(message.body()); - int id = counter.incrementAndGet(); - String chunk = json.encode() + "\n\n"; - connections.forEach(resp -> { - try { - resp.write("id" + ": " + id + "\n"); - resp.write("data:" + chunk); - } catch (IllegalStateException e) { - // Connection close. - } - }); - }); - } - - private JsonObject build(JsonObject body) { - String state = body.getString("state"); - JsonObject json = new JsonObject(); - json.put("type", "HystrixCommand"); - json.put("name", body.getString("name")); - json.put("group", body.getString("node")); - json.put("currentTime", System.currentTimeMillis()); - json.put("isCircuitBreakerOpen", state.equalsIgnoreCase(CircuitBreakerState.OPEN.toString())); - json.put("errorPercentage", body.getInteger("rollingErrorPercentage", 0)); - json.put("errorCount", body.getInteger("rollingErrorCount", 0)); - json.put("requestCount", body.getInteger("rollingOperationCount", 0)); - json.put("rollingCountCollapsedRequests", 0); - json.put("rollingCountExceptionsThrown", body.getInteger("rollingExceptionCount", 0)); - json.put("rollingCountFailure", body.getInteger("rollingFailureCount", 0)); - json.put("rollingCountTimeout", body.getInteger("rollingTimeoutCount", 0)); - json.put("rollingCountFallbackFailure", body.getInteger("rollingFallbackFailureCount", 0)); - json.put("rollingCountFallbackRejection", body.getInteger("fallbackRejection", 0)); - json.put("rollingCountFallbackSuccess", body.getInteger("rollingFallbackSuccessCount", 0)); - json.put("rollingCountResponsesFromCache", 0); - json.put("rollingCountSemaphoreRejected", 0); - json.put("rollingCountShortCircuited", body.getInteger("rollingShortCircuitedCount", 0)); - json.put("rollingCountSuccess", body.getInteger("rollingSuccessCount", 0)); - json.put("rollingCountThreadPoolRejected", 0); - json.put("rollingCountTimeout", body.getInteger("rollingTimeoutCount", 0)); - json.put("rollingCountBadRequests", 0); - json.put("rollingCountEmit", 0); - json.put("rollingCountFallbackEmit", 0); - json.put("rollingCountFallbackMissing", 0); - json.put("rollingMaxConcurrentExecutionCount", 0); - json.put("currentConcurrentExecutionCount", 0); - json.put("latencyExecute_mean", body.getInteger("rollingLatencyMean", 0)); - json.put("latencyExecute", body.getJsonObject("rollingLatency", new JsonObject())); - json.put("latencyTotal_mean", body.getInteger("totalLatencyMean", 0)); - json.put("latencyTotal", body.getJsonObject("totalLatency", new JsonObject())); - - json.put("propertyValue_circuitBreakerRequestVolumeThreshold", 0); - json.put("propertyValue_circuitBreakerSleepWindowInMilliseconds", body.getLong("resetTimeout", 0L)); - json.put("propertyValue_circuitBreakerErrorThresholdPercentage", 0); - json.put("propertyValue_circuitBreakerForceOpen", false); - json.put("propertyValue_circuitBreakerForceClosed", false); - json.put("propertyValue_circuitBreakerEnabled", true); - json.put("propertyValue_executionIsolationStrategy", "THREAD"); - json.put("propertyValue_executionIsolationThreadTimeoutInMilliseconds", body.getLong("timeout", 0L)); - json.put("propertyValue_executionIsolationThreadInterruptOnTimeout", true); - json.put("propertyValue_executionIsolationThreadPoolKeyOverride", ""); - json.put("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", 0); - json.put("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", 0); - json.put("propertyValue_metricsRollingStatisticalWindowInMilliseconds", body.getLong("metricRollingWindow", 0L)); - json.put("propertyValue_requestCacheEnabled", false); - json.put("propertyValue_requestLogEnabled", false); - json.put("reportingHosts", 1); - return json; - } - - @Override - public void handle(RoutingContext rc) { - HttpServerResponse response = rc.response(); - response - .setChunked(true) - .putHeader(HttpHeaders.CONTENT_TYPE, "text/event-stream") - .putHeader(HttpHeaders.CACHE_CONTROL, "no-cache") - .putHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE); - - rc.request().connection() - .closeHandler(v -> { - connections.remove(response); - endQuietly(response); - }) - .exceptionHandler(t -> { - connections.remove(response); - rc.fail(t); - }); - - connections.add(response); - } - - private static void endQuietly(HttpServerResponse response) { - if (response.ended()) { - return; - } - try { - response.end(); - } catch (IllegalStateException e) { - // Ignore it. - } - } -} diff --git a/src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java b/src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java deleted file mode 100644 index 92215ed..0000000 --- a/src/test/java/io/vertx/circuitbreaker/impl/HttpClientCommand.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2011-2016 The original author or authors - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ - -package io.vertx.circuitbreaker.impl; - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import io.vertx.core.Handler; -import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.http.HttpMethod; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; - -/** - * @author Clement Escoffier - */ -public class HttpClientCommand extends HystrixCommand { - - private final HttpClient client; - private final String path; - - public HttpClientCommand(HttpClient client, String path) { - super(HystrixCommandGroupKey.Factory.asKey("test")); - this.client = client; - this.path = path; - } - - @Override - protected String run() throws Exception { - AtomicReference result = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - - Handler errorHandler = t -> { - latch.countDown(); - }; - - client.request(HttpMethod.GET, path).onComplete(ar1 -> { - if (ar1.succeeded()) { - HttpClientRequest req = ar1.result(); - req.send().onComplete(ar2 -> { - if (ar2.succeeded()) { - HttpClientResponse response = ar2.result(); - response.exceptionHandler(errorHandler); - if (response.statusCode() != 200) { - latch.countDown(); - return; - } - response.bodyHandler(content -> { - result.set(content.toString()); - latch.countDown(); - }); - } else { - errorHandler.handle(ar2.cause()); - } - }); - } else { - errorHandler.handle(ar1.cause()); - } - }); - - latch.await(); - - if (result.get() == null) { - throw new RuntimeException("Failed to retrieve the HTTP response"); - } else { - return result.get(); - } - } - - @Override - protected String getFallback() { - return "fallback"; - } -} diff --git a/src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java b/src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java deleted file mode 100644 index 6c62a58..0000000 --- a/src/test/java/io/vertx/circuitbreaker/impl/HystrixMetricEventStreamTest.java +++ /dev/null @@ -1,209 +0,0 @@ -package io.vertx.circuitbreaker.impl; - -import io.vertx.circuitbreaker.CircuitBreaker; -import io.vertx.circuitbreaker.CircuitBreakerOptions; -import io.vertx.circuitbreaker.HystrixMetricHandler; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.http.HttpMethod; -import io.vertx.core.json.JsonObject; -import io.vertx.core.parsetools.JsonParser; -import io.vertx.core.parsetools.RecordParser; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.Repeat; -import io.vertx.ext.unit.junit.RepeatRule; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.ext.web.Router; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import java.util.List; -import java.util.Random; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.jayway.awaitility.Awaitility.await; -import static io.vertx.circuitbreaker.CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.hamcrest.core.Is.is; - -/** - * @author Clement Escoffier - */ -@RunWith(VertxUnitRunner.class) -public class HystrixMetricEventStreamTest { - - @Rule - public RepeatRule rule = new RepeatRule(); - - - private CircuitBreaker breakerA; - private CircuitBreaker breakerB; - private CircuitBreaker breakerC; - - - private Vertx vertx; - - @Before - public void setUp(TestContext tc) { - vertx = Vertx.vertx(); - vertx.exceptionHandler(tc.exceptionHandler()); - } - - @After - public void tearDown() { - vertx.exceptionHandler(null); - - if (breakerA != null) { - breakerA.close(); - } - if (breakerB != null) { - breakerB.close(); - } - if (breakerC != null) { - breakerC.close(); - } - - AtomicBoolean completed = new AtomicBoolean(); - vertx.close().onComplete(ar -> completed.set(ar.succeeded())); - await().untilAtomic(completed, is(true)); - } - - - @Test - @Repeat(10) - public void test() { - CircuitBreakerOptions options = new CircuitBreakerOptions() - .setNotificationAddress(DEFAULT_NOTIFICATION_ADDRESS) - .setTimeout(1000); - breakerA = CircuitBreaker.create("A", vertx, new CircuitBreakerOptions(options)); - breakerB = CircuitBreaker.create("B", vertx, new CircuitBreakerOptions(options)); - breakerC = CircuitBreaker.create("C", vertx, new CircuitBreakerOptions(options)); - - Router router = Router.router(vertx); - router.get("/metrics").handler(HystrixMetricHandler.create(vertx)); - - AtomicBoolean ready = new AtomicBoolean(); - vertx.createHttpServer() - .requestHandler(router) - .listen(8080).onComplete(ar -> ready.set(ar.succeeded())); - - await().untilAtomic(ready, is(true)); - - List responses = new CopyOnWriteArrayList<>(); - HttpClient client = vertx.createHttpClient(); - - JsonParser jp = JsonParser.newParser().objectValueMode().handler( - jsonEvent -> responses.add(jsonEvent.objectValue()) - ); - RecordParser parser = RecordParser.newDelimited("\n\n", buffer -> { - String record = buffer.toString(); - String[] lines = record.split("\n"); - for (String line : lines) { - String l = line.trim(); - if (l.startsWith("data:")) { - String json = l.substring("data:".length()); - jp.handle(Buffer.buffer(json)); - } - } - }); - - client.request(HttpMethod.GET, 8080, "localhost", "/metrics").onComplete(ar1 -> { - if (ar1.succeeded()) { - HttpClientRequest req = ar1.result(); - req.send().onComplete(ar2 -> { - if (ar2.succeeded()) { - HttpClientResponse resp = ar2.result(); - resp.handler(parser); - } - }); - } - }); - - for (int i = 0; i < 1000; i++) { - breakerA.execute(choose()); - breakerB.execute(choose()); - breakerC.execute(choose()); - } - - await().atMost(1, TimeUnit.MINUTES).until(() -> responses.size() > 50); - - // Check that we got metrics for A, B and C - JsonObject a = null; - JsonObject b = null; - JsonObject c = null; - for (JsonObject json : responses) { - switch (json.getString("name")) { - case "A": - a = json; - break; - case "B": - b = json; - break; - case "C": - c = json; - break; - } - } - - client.close(); - - assertThat(a).isNotNull(); - assertThat(b).isNotNull(); - assertThat(c).isNotNull(); - } - - - private Random random = new Random(); - - private Handler> choose() { - int choice = random.nextInt(5); - switch (choice) { - case 0: - return commandThatWorks(); - case 1: - return commandThatFails(); - case 2: - return commandThatCrashes(); - case 3: - return commandThatTimeout(1000); - case 4: - return commandThatTimeoutAndFail(1000); - } - return commandThatWorks(); - } - - - private Handler> commandThatWorks() { - return (future -> vertx.setTimer(5, l -> future.complete(null))); - } - - private Handler> commandThatFails() { - return (future -> vertx.setTimer(5, l -> future.fail("expected failure"))); - } - - private Handler> commandThatCrashes() { - return (future -> { - throw new RuntimeException("Expected error"); - }); - } - - private Handler> commandThatTimeout(int timeout) { - return (future -> vertx.setTimer(timeout + 500, l -> future.complete(null))); - } - - private Handler> commandThatTimeoutAndFail(int timeout) { - return (future -> vertx.setTimer(timeout + 500, l -> future.fail("late failure"))); - } - - -} diff --git a/src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java b/src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java deleted file mode 100644 index be77619..0000000 --- a/src/test/java/io/vertx/circuitbreaker/impl/HystrixTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright (c) 2011-2016 The original author or authors - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ - -package io.vertx.circuitbreaker.impl; - -import io.vertx.core.Context; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.http.HttpMethod; -import io.vertx.core.http.HttpServer; -import io.vertx.ext.web.Router; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static com.jayway.awaitility.Awaitility.await; -import static org.assertj.core.api.Assertions.assertThat; -import static org.hamcrest.core.Is.is; - -/** - * Some test to demonstrate how Hystrix can be used. - * - * @author Clement Escoffier - */ -public class HystrixTest { - - private Vertx vertx; - private HttpServer http; - private HttpClient client; - - @Before - public void setUp() { - vertx = Vertx.vertx(); - Router router = Router.router(vertx); - router.route(HttpMethod.GET, "/").handler(ctxt -> { - ctxt.response().setStatusCode(200).end("hello"); - }); - router.route(HttpMethod.GET, "/error").handler(ctxt -> { - ctxt.response().setStatusCode(500).end("failed !"); - }); - router.route(HttpMethod.GET, "/long").handler(ctxt -> { - try { - Thread.sleep(2000); - } catch (Exception e) { - // Ignored. - } - ctxt.response().setStatusCode(200).end("hello"); - }); - - AtomicBoolean done = new AtomicBoolean(); - vertx.createHttpServer().requestHandler(router).listen(8080).onComplete(ar -> { - done.set(ar.succeeded()); - http = ar.result(); - }); - - await().untilAtomic(done, is(true)); - - client = vertx.createHttpClient(new HttpClientOptions().setDefaultPort(8080).setDefaultHost("localhost")); - } - - @After - public void tearDown() { - AtomicBoolean completed = new AtomicBoolean(); - http.close().onComplete(ar -> { - completed.set(true); - }); - await().untilAtomic(completed, is(true)); - - completed.set(false); - vertx.close().onComplete(v -> completed.set(true)); - await().untilAtomic(completed, is(true)); - - client.close(); - } - - @Test - public void testOk() throws Exception { - AtomicReference result = new AtomicReference<>(); - - vertx.runOnContext(v -> { - - // Blocking call..., need to run in an executeBlocking - vertx.executeBlocking( - () -> { - HttpClientCommand command = new HttpClientCommand(client, "/"); - return command.execute(); - }).onComplete(ar -> result.set(ar.result()) - ); - }); - - await().until(() -> result.get() != null); - assertThat(result.get()).isEqualToIgnoringCase("hello"); - - result.set(null); - vertx.runOnContext(v -> { - - // Blocking call..., need to run in an executeBlocking - vertx.executeBlocking( - () -> { - HttpClientCommand command = new HttpClientCommand(client, "/"); - return command.queue().get(); - }).onComplete(ar -> result.set(ar.result()) - ); - }); - - await().until(() -> result.get() != null); - assertThat(result.get()).isEqualToIgnoringCase("hello"); - } - - @Test - public void testFailure() throws Exception { - AtomicReference result = new AtomicReference<>(); - - vertx.runOnContext(v -> { - - // Blocking call..., need to run in an executeBlocking - - vertx.executeBlocking( - () -> { - HttpClientCommand command = new HttpClientCommand(client, "/error"); - return command.execute(); - }).onComplete(ar -> result.set(ar.result()) - ); - }); - - await().until(() -> result.get() != null); - assertThat(result.get()).isEqualToIgnoringCase("fallback"); - - result.set(null); - vertx.runOnContext(v -> { - - // Blocking call..., need to run in an executeBlocking - vertx.executeBlocking( - () -> { - HttpClientCommand command = new HttpClientCommand(client, "/error"); - return command.queue().get(); - }).onComplete(ar -> result.set(ar.result()) - ); - }); - - await().until(() -> result.get() != null); - assertThat(result.get()).isEqualToIgnoringCase("fallback"); - } - - @Test - public void testObservable() throws Exception { - AtomicReference result = new AtomicReference<>(); - - vertx.runOnContext(v -> { - Context context = vertx.getOrCreateContext(); - HttpClientCommand command = new HttpClientCommand(client, "/"); - command.observe().subscribe(s -> { - context.runOnContext(v2 -> checkSetter(result, s)); - }); - }); - - await().until(() -> result.get() != null); - assertThat(result.get()).isEqualToIgnoringCase("hello"); - } - - private void checkSetter(AtomicReference ref, String value) { - if (Context.isOnEventLoopThread()) { - ref.set(value); - } else { - ref.set("Not on the event loop"); - } - } -} diff --git a/src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java b/src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java deleted file mode 100644 index f046a98..0000000 --- a/src/test/java/io/vertx/circuitbreaker/metrics/DashboardExample.java +++ /dev/null @@ -1,128 +0,0 @@ -package io.vertx.circuitbreaker.metrics; - -import io.vertx.circuitbreaker.CircuitBreaker; -import io.vertx.circuitbreaker.CircuitBreakerOptions; -import io.vertx.circuitbreaker.HystrixMetricHandler; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.RoutingContext; - -import java.util.Random; - -import static io.vertx.circuitbreaker.CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS; - -/** - * @author Clement Escoffier - */ -public class DashboardExample { - - private static Random random = new Random(); - - public static void main(String[] args) { - Vertx vertx = Vertx.vertx(); - CircuitBreakerOptions options = new CircuitBreakerOptions() - .setNotificationAddress(DEFAULT_NOTIFICATION_ADDRESS) - .setFallbackOnFailure(true) - .setMaxFailures(10) - .setResetTimeout(5000) - .setTimeout(1000) - .setMetricsRollingWindow(10000); - - CircuitBreaker cba = CircuitBreaker.create("A", vertx, new CircuitBreakerOptions(options)); - CircuitBreaker cbb = CircuitBreaker.create("B", vertx, new CircuitBreakerOptions(options)); - CircuitBreaker cbc = CircuitBreaker.create("C", vertx, new CircuitBreakerOptions(options)); - - Router router = Router.router(vertx); - router.get("/metrics").handler(HystrixMetricHandler.create(vertx)); - - - router.get("/A").handler(rc -> a(rc, cba)); - router.get("/B").handler(rc -> b(rc, cbb)); - router.get("/C").handler(rc -> c(rc, cbc)); - - vertx.createHttpServer() - .requestHandler(router) - .listen(8080); - } - - - private static void a(RoutingContext rc, CircuitBreaker cb) { - int choice = random.nextInt(10); - if (choice < 7) { - cb.executeWithFallback( - commandThatWorks(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } else { - cb.executeWithFallback( - commandThatFails(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } - } - - private static void b(RoutingContext rc, CircuitBreaker cb) { - int choice = random.nextInt(10); - if (choice < 5) { - cb.executeWithFallback( - commandThatWorks(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } else if (choice < 7) { - cb.executeWithFallback( - commandThatCrashes(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } else { - cb.executeWithFallback( - commandThatFails(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } - } - - private static void c(RoutingContext rc, CircuitBreaker cb) { - int choice = random.nextInt(10); - if (choice < 5) { - cb.executeWithFallback( - commandThatWorks(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } else if (choice < 7) { - cb.executeWithFallback( - commandThatTimeout(rc.vertx(), 15000), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } else { - cb.executeWithFallback( - commandThatFails(rc.vertx()), - (t) -> "OK (fallback)") - .onComplete(s -> rc.response().end(s.result())); - } - } - - private static Handler> commandThatWorks(Vertx vertx) { - return (future -> vertx.setTimer(5, l -> future.complete("OK !"))); - } - - private static Handler> commandThatFails(Vertx vertx) { - return (future -> vertx.setTimer(5, l -> future.fail("expected failure"))); - } - - private static Handler> commandThatCrashes(Vertx vertx) { - return (future -> { - throw new RuntimeException("Expected error"); - }); - } - - private static Handler> commandThatTimeout(Vertx vertx, int timeout) { - return (future -> vertx.setTimer(timeout + 500, l -> future.complete("Is it too late ?"))); - } - - private static Handler> commandThatTimeoutAndFail(Vertx vertx, int timeout) { - return (future -> vertx.setTimer(timeout + 500, l -> future.fail("late failure"))); - } - -} diff --git a/src/test/java/io/vertx/circuitbreaker/metrics/README.md b/src/test/java/io/vertx/circuitbreaker/metrics/README.md deleted file mode 100644 index eb0ba3f..0000000 --- a/src/test/java/io/vertx/circuitbreaker/metrics/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# Metrics demo - - -These files are a simple demo to illustrate the Hystrix Dashboard. - -1. Start the Hystrix Dashboard (we recommend using https://github.com/kennedyoliveira/standalone-hystrix-dashboard) -2. Start the DashboardExample creating 3 different circuit breakers -3. On the dashboard, register the metrics endpoint: http://localhost:8080/metrics -4. Start the RandomClient generating load -5. The circuit breaker metrics are now published in the dashboard - diff --git a/src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java b/src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java deleted file mode 100644 index 7c67d66..0000000 --- a/src/test/java/io/vertx/circuitbreaker/metrics/RandomClient.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.vertx.circuitbreaker.metrics; - -import io.vertx.core.AbstractVerticle; -import io.vertx.core.DeploymentOptions; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.http.HttpMethod; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author Clement Escoffier - */ -public class RandomClient extends AbstractVerticle { - - public static void main(String[] args) { - Vertx vertx = Vertx.vertx(); - vertx.deployVerticle(RandomClient.class.getName(), new DeploymentOptions().setInstances(4)); - } - - List paths = new ArrayList<>(); - Random random = new Random(); - - @Override - public void start() throws Exception { - paths.add("/A"); - paths.add("/A"); - paths.add("/B"); - paths.add("/C"); - - AtomicInteger counter = new AtomicInteger(); - vertx.setPeriodic(500, l -> { - int index = random.nextInt(paths.size()); - int count = counter.getAndIncrement(); - vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", paths.get(index)).onComplete(ar1 -> { - if (ar1.succeeded()) { - HttpClientRequest request = ar1.result(); - request.send().onComplete(ar2 -> { - if (ar2.succeeded()) { - HttpClientResponse response = ar2.result(); - System.out.println(this + "[" + count + "] (" + paths.get(index) + ") Response: " + response.statusMessage()); - response.bodyHandler(buffer -> { - System.out.println(this + "[" + count + "] (" + paths.get(index) + ") Data: " + buffer.toString()); - }); - } - }); - } - }); - }); - } -} From 66db84b56c4771c6c97e4a9be554440a7d9f26da Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 4 Jul 2023 13:52:28 +0200 Subject: [PATCH 2/6] Update test dependencies --- pom.xml | 16 +++++----------- .../io/vertx/circuitbreaker/impl/APITest.java | 2 +- .../impl/CircuitBreakerImplTest.java | 2 +- .../impl/CircuitBreakerMetricsTest.java | 2 +- .../impl/CircuitBreakerWithHTTPTest.java | 2 +- .../impl/DeprecatedRetryPolicyTest.java | 2 +- .../circuitbreaker/impl/NumberOfRetryTest.java | 2 +- .../circuitbreaker/impl/RetryPolicyTest.java | 2 +- .../io/vertx/circuitbreaker/impl/UsageTest.java | 2 +- 9 files changed, 13 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index c24bbd4..a045d0f 100644 --- a/pom.xml +++ b/pom.xml @@ -66,34 +66,28 @@ - - com.jayway.restassured - rest-assured - 2.8.0 - test - junit junit - 4.13.1 + 4.13.2 test org.assertj assertj-core - 3.3.0 + 3.24.2 test - com.jayway.awaitility + org.awaitility awaitility - 1.7.0 + 4.2.0 test com.github.tomakehurst wiremock - 2.2.1 + 2.27.2 test diff --git a/src/test/java/io/vertx/circuitbreaker/impl/APITest.java b/src/test/java/io/vertx/circuitbreaker/impl/APITest.java index 26d6eff..7f03b3e 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/APITest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/APITest.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.core.Is.is; diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java index 44de07c..11c509d 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.core.Is.is; diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java index 9d28421..663563b 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetricsTest.java @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static io.vertx.circuitbreaker.asserts.Assertions.assertThat; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java index 89b6678..b0e9fce 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerWithHTTPTest.java @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static com.jayway.awaitility.Awaitility.*; +import static org.awaitility.Awaitility.*; import static io.vertx.core.http.HttpHeaders.*; import static java.util.concurrent.TimeUnit.*; import static org.assertj.core.api.Assertions.assertThat; diff --git a/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java b/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java index afb78a7..5bf1841 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/DeprecatedRetryPolicyTest.java @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import static com.jayway.awaitility.Awaitility.*; +import static org.awaitility.Awaitility.*; import static org.hamcrest.Matchers.*; /** diff --git a/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java b/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java index 7970ec8..f87c78c 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/NumberOfRetryTest.java @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.is; /** diff --git a/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java b/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java index 87b750d..8912b33 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/RetryPolicyTest.java @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; -import static com.jayway.awaitility.Awaitility.*; +import static org.awaitility.Awaitility.*; import static org.hamcrest.Matchers.*; /** diff --git a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java index 3a44b03..91b921a 100644 --- a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java +++ b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import static com.github.tomakehurst.wiremock.client.WireMock.*; -import static com.jayway.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; From 50c847c14e6627cc011e148906edfefeb1466705 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 30 Jun 2023 16:31:57 +0200 Subject: [PATCH 3/6] Improve documentation --- src/main/asciidoc/index.adoc | 71 +++---- .../java/examples/CircuitBreakerExamples.java | 185 +++++++----------- .../vertx/circuitbreaker/CircuitBreaker.java | 83 ++++---- .../circuitbreaker/CircuitBreakerOptions.java | 125 ++++++------ .../circuitbreaker/CircuitBreakerState.java | 6 +- .../io/vertx/circuitbreaker/RetryPolicy.java | 5 +- .../circuitbreaker/TimeoutException.java | 2 +- .../impl/CircuitBreakerImpl.java | 9 +- 8 files changed, 230 insertions(+), 256 deletions(-) diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index aa20c2a..259e1aa 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -1,7 +1,7 @@ = Vert.x Circuit Breaker -Vert.x Circuit Breaker is an implementation of the Circuit Breaker _pattern_ for Vert.x. It keeps track of the -number of failures and _opens the circuit_ when a threshold is reached. Optionally, a fallback is executed. +Vert.x Circuit Breaker is an implementation of the _circuit breaker_ pattern for Vert.x. It keeps track of the +number of recent failures and prevents further executions when a threshold is reached. Optionally, a fallback is executed. Supported failures are: @@ -12,9 +12,9 @@ Supported failures are: Operations guarded by a circuit breaker are intended to be non-blocking and asynchronous in order to benefit from the Vert.x execution model. -== Using the vert.x circuit breaker +== Using Vert.x Circuit Breaker -To use the Vert.x Circuit Breaker, add the following dependency to the _dependencies_ section of your build +To use Vert.x Circuit Breaker, add the following dependency to the _dependencies_ section of your build descriptor: * Maven (in your `pom.xml`): @@ -39,8 +39,8 @@ compile 'io.vertx:vertx-circuit-breaker:${maven.version}' To use the circuit breaker you need to: -1. Create a circuit breaker, with the configuration you want (timeout, number of failure before opening the circuit) -2. Execute some code using the breaker +1. Create a circuit breaker, with the configuration you want (timeout, failure threshold) +2. Execute some code using the circuit breaker **Important**: Don't recreate a circuit breaker on every call. A circuit breaker is a stateful entity. It is recommended to store the circuit breaker instance in a field. @@ -52,8 +52,8 @@ Here is an example: {@link examples.CircuitBreakerExamples#example1(io.vertx.core.Vertx)} ---- -The executed block receives a {@link io.vertx.core.Future} object as parameter, to denote the -success or failure of the operation as well as the result. For example in the following example, the result is the +The executed block receives a {@link io.vertx.core.Promise} object as parameter, to denote the +success or failure of the operation as well as the result. In the following example, the result is the output of a REST endpoint invocation: [source,$lang] @@ -64,18 +64,18 @@ output of a REST endpoint invocation: The result of the operation is provided using the: * returned {@link io.vertx.core.Future} when calling `execute` methods -* provided {@link io.vertx.core.Future} when calling the `executeAndReport` methods +* provided {@link io.vertx.core.Promise} when calling the `executeAndReport` methods -Optionally, you can provide a fallback which is executed when the circuit is open: +Optionally, you can provide a fallback which is executed when the circuit breaker is open: [source,$lang] ---- {@link examples.CircuitBreakerExamples#example3(io.vertx.core.Vertx)} ---- -The fallback is called whenever the circuit is open, or if the -{@link io.vertx.circuitbreaker.CircuitBreakerOptions#isFallbackOnFailure()} is enabled. When a fallback is -set, the result is using the output of the fallback function. The fallback function takes as parameter a +The fallback is called when the circuit breaker is open, or when +{@link io.vertx.circuitbreaker.CircuitBreakerOptions#isFallbackOnFailure()} is enabled. When fallback is +set, the overall result is obtained by calling the fallback function. The fallback function takes as parameter a {@link java.lang.Throwable} object and returns an object of the expected type. The fallback can also be set on the {@link io.vertx.circuitbreaker.CircuitBreaker} object directly: @@ -85,24 +85,31 @@ The fallback can also be set on the {@link io.vertx.circuitbreaker.CircuitBreake {@link examples.CircuitBreakerExamples#example4(io.vertx.core.Vertx)} ---- +=== Reported exceptions + +The fallback receives: + +* {@link io.vertx.circuitbreaker.OpenCircuitException} when the circuit breaker is open +* {@link io.vertx.circuitbreaker.TimeoutException} when the operation timed out + == Retries -You can also specify how often the circuit breaker should try your code before failing with {@link io.vertx.circuitbreaker.CircuitBreakerOptions#setMaxRetries(int)}. +You can also specify how often the circuit breaker should execute your code before failing with {@link io.vertx.circuitbreaker.CircuitBreakerOptions#setMaxRetries(int)}. If you set this to something higher than 0, your code gets executed several times before finally failing in the last execution. -If the code succeeded in one of the retries your handler gets notified and any retries left are skipped. -Retries are only supported when the circuit is closed. +If the code succeeds in one of the retries, your handler gets notified and no more retries occur. +Retries are only supported when the circuit breaker is closed. NOTE: If you set `maxRetries` to 2, your operation may be called 3 times: the initial attempt and 2 retries. -By default, the timeout between retries is set to 0, which means that retries will be executed one after another without any delay. +By default, the delay between retries is set to 0, which means that retries will be executed one after another immediately. This, however, will result in increased load on the called service and may delay its recovery. In order to mitigate this problem, it is recommended to execute retries with a delay. The {@link io.vertx.circuitbreaker.CircuitBreaker#retryPolicy(io.vertx.circuitbreaker.RetryPolicy)} method can be used to specify a retry policy. -A retry policy is a function which receives the operation failure and retry count as arguments and returns a timeout in milliseconds before retry is executed. +A retry policy is a function which receives the operation failure and retry count as arguments and returns a delay in milliseconds before retry should be executed. It allows to implement complex policies, e.g. using the value of the https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After[`Retry-After`] header sent by an unavailable service. -But a few common policies are provided: {@link io.vertx.circuitbreaker.RetryPolicy#constantDelay}, {@link io.vertx.circuitbreaker.RetryPolicy#linearDelay} and {@link io.vertx.circuitbreaker.RetryPolicy#exponentialDelayWithJitter} +Some common policies are provided out of the box: {@link io.vertx.circuitbreaker.RetryPolicy#constantDelay}, {@link io.vertx.circuitbreaker.RetryPolicy#linearDelay} and {@link io.vertx.circuitbreaker.RetryPolicy#exponentialDelayWithJitter} Below is an example of exponential delay with jitter: @@ -113,19 +120,19 @@ Below is an example of exponential delay with jitter: == Callbacks -You can also configures callbacks invoked when the circuit is opened or closed: +You can also configure callbacks invoked when the circuit breaker is opened or closed: [source,$lang] ---- {@link examples.CircuitBreakerExamples#example5(io.vertx.core.Vertx)} ---- -You can also be notified when the circuit breaker decides to attempt to reset (half-open state). You can register +You can also be notified when the circuit breaker moves to the half-open state, in an attempt to reset. You can register such a callback with {@link io.vertx.circuitbreaker.CircuitBreaker#halfOpenHandler(io.vertx.core.Handler)}. == Event bus notification -Every time the circuit state changes, an event can be published on the event bus. +Every time the circuit breaker state changes, an event can be published on the event bus. To enable this feature, set the {@link io.vertx.circuitbreaker.CircuitBreakerOptions#setNotificationAddress(java.lang.String) notification address} to a value that is not `null`: @@ -134,7 +141,8 @@ To enable this feature, set the {@link io.vertx.circuitbreaker.CircuitBreakerOpt {@link examples.CircuitBreakerExamples#enableNotifications} ---- -The event contains circuit breaker metrics which computation requires the following dependency to be added the _dependencies_ section of your build descriptor: +The event contains circuit breaker metrics. +Computing these metrics requires the following dependency to be added the _dependencies_ section of your build descriptor: * Maven (in your `pom.xml`): @@ -156,7 +164,7 @@ compile 'org.hdrhistogram:HdrHistogram:2.1.12' [NOTE] ==== -When enabled, notifications are, by default, delivered only to local consumers. +When enabled, notifications are delivered only to local consumers by default. If the notification must be sent to all consumers in a cluster, you can change this behavior with {@link io.vertx.circuitbreaker.CircuitBreakerOptions#setNotificationLocalOnly}. ==== @@ -170,17 +178,10 @@ Each event contains a Json Object with: == The half-open state -When the circuit is "open", calls to the circuit breaker fail immediately, without any attempt to execute the real -operation. After a suitable amount of time (configured from -{@link io.vertx.circuitbreaker.CircuitBreakerOptions#setResetTimeout(long)}, the circuit breaker decides that the +When the circuit breaker is `open`, calls to the circuit breaker fail immediately, without any attempt to execute the real +operation. After a suitable amount of time (configured by +{@link io.vertx.circuitbreaker.CircuitBreakerOptions#setResetTimeout(long)}), the circuit breaker decides that the operation has a chance of succeeding, so it goes into the `half-open` state. In this state, the next call to the -circuit breaker is allowed to execute the dangerous operation. Should the call succeed, the circuit breaker resets +circuit breaker is allowed to execute the guarded operation. Should the call succeed, the circuit breaker resets and returns to the `closed` state, ready for more routine operation. If this trial call fails, however, the circuit breaker returns to the `open` state until another timeout elapses. - -== Reported exceptions - -The fallback receives a: - -* {@link io.vertx.circuitbreaker.OpenCircuitException} when the circuit breaker is opened -* {@link io.vertx.circuitbreaker.TimeoutException} when the operation timed out diff --git a/src/main/java/examples/CircuitBreakerExamples.java b/src/main/java/examples/CircuitBreakerExamples.java index 672aa4a..04b9b12 100644 --- a/src/main/java/examples/CircuitBreakerExamples.java +++ b/src/main/java/examples/CircuitBreakerExamples.java @@ -32,11 +32,11 @@ public class CircuitBreakerExamples { public void example1(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions() - .setMaxFailures(5) // number of failure before opening the circuit - .setTimeout(2000) // consider a failure if the operation does not succeed in time - .setFallbackOnFailure(true) // do we call the fallback on failure - .setResetTimeout(10000) // time spent in open state before attempting to re-try + new CircuitBreakerOptions() + .setMaxFailures(5) // number of failures before opening the circuit breaker + .setTimeout(2000) // considered a failure if the operation does not succeed in time + .setFallbackOnFailure(true) // call the fallback on failure + .setResetTimeout(10000) // time spent in open state before attempting to retry ); // --- @@ -44,10 +44,10 @@ public void example1(Vertx vertx) { // --- breaker.execute(promise -> { - // some code executing with the breaker - // the code reports failures or success on the given promise. - // if this promise is marked as failed, the breaker increased the - // number of failures + // some code executing with the circuit breaker + // the code reports failures or success on the given promise + // if this promise is marked as failed, the circuit breaker + // increases the number of failures }).onComplete(ar -> { // Get the operation result. }); @@ -55,7 +55,7 @@ public void example1(Vertx vertx) { public void example2(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) + new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) ); // --- @@ -72,7 +72,8 @@ public void example2(Vertx vertx) { } else { return resp.body().map(Buffer::toString); } - })).onComplete(promise); + })) + .onComplete(promise); }).onComplete(ar -> { // Do something with the result }); @@ -80,132 +81,98 @@ public void example2(Vertx vertx) { public void example3(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) + new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) ); // --- // Store the circuit breaker in a field and access it as follows // --- - breaker.executeWithFallback( - promise -> { - vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") - .compose(req -> req - .send() - .compose(resp -> { - if (resp.statusCode() != 200) { - return Future.failedFuture("HTTP error"); - } else { - return resp.body().map(Buffer::toString); - } - })).onComplete(promise); - }, v -> { - // Executed when the circuit is opened - return "Hello"; - }) - .onComplete(ar -> { - // Do something with the result - }); + breaker.executeWithFallback(promise -> { + vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") + .compose(req -> req + .send() + .compose(resp -> { + if (resp.statusCode() != 200) { + return Future.failedFuture("HTTP error"); + } else { + return resp.body().map(Buffer::toString); + } + })) + .onComplete(promise); + }, v -> { + // Executed when the circuit breaker is open + return "Hello"; + }).onComplete(ar -> { + // Do something with the result + }); } public void example4(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) + new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) ).fallback(v -> { - // Executed when the circuit is opened. + // Executed when the circuit breaker is open. return "hello"; }); - breaker.execute( - promise -> { - vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") - .compose(req -> req - .send() - .compose(resp -> { - if (resp.statusCode() != 200) { - return Future.failedFuture("HTTP error"); - } else { - return resp.body().map(Buffer::toString); - } - })).onComplete(promise); - }); + breaker.execute(promise -> { + vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") + .compose(req -> req + .send() + .compose(resp -> { + if (resp.statusCode() != 200) { + return Future.failedFuture("HTTP error"); + } else { + return resp.body().map(Buffer::toString); + } + })) + .onComplete(promise); + }); } public void example5(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) + new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) ).openHandler(v -> { - System.out.println("Circuit opened"); + System.out.println("Circuit breaker opened"); }).closeHandler(v -> { - System.out.println("Circuit closed"); + System.out.println("Circuit breaker closed"); }); - breaker.execute( - promise -> { - vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") - .compose(req -> req - .send() - .compose(resp -> { - if (resp.statusCode() != 200) { - return Future.failedFuture("HTTP error"); - } else { - return resp.body().map(Buffer::toString); - } - })).onComplete(promise); - }); - } - - public void example6(Vertx vertx) { - CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, - new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000) - ); - - Promise userPromise = Promise.promise(); - userPromise.future().onComplete(ar -> { - // Do something with the result + breaker.execute(promise -> { + vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") + .compose(req -> req + .send() + .compose(resp -> { + if (resp.statusCode() != 200) { + return Future.failedFuture("HTTP error"); + } else { + return resp.body().map(Buffer::toString); + } + })) + .onComplete(promise); }); - - breaker.executeAndReportWithFallback( - userPromise, - promise -> { - vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") - .compose(req -> req - .send() - .compose(resp -> { - if (resp.statusCode() != 200) { - return Future.failedFuture("HTTP error"); - } else { - return resp.body().map(Buffer::toString); - } - })).onComplete(promise); - }, v -> { - // Executed when the circuit is opened - return "Hello"; - }); } public void example8(Vertx vertx) { CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000) - ).openHandler(v -> { - System.out.println("Circuit opened"); - }).closeHandler(v -> { - System.out.println("Circuit closed"); - }).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500)); - - breaker.execute( - promise -> { - vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") - .compose(req -> req - .send() - .compose(resp -> { - if (resp.statusCode() != 200) { - return Future.failedFuture("HTTP error"); - } else { - return resp.body().map(Buffer::toString); - } - })).onComplete(promise); - }); + ).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500)); + + breaker.execute(promise -> { + vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/") + .compose(req -> req + .send() + .compose(resp -> { + if (resp.statusCode() != 200) { + return Future.failedFuture("HTTP error"); + } else { + return resp.body().map(Buffer::toString); + } + })) + .onComplete(promise); + }); } public void enableNotifications(CircuitBreakerOptions options) { diff --git a/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java b/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java index 2c6a113..e3d1d5c 100644 --- a/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java +++ b/src/main/java/io/vertx/circuitbreaker/CircuitBreaker.java @@ -40,7 +40,7 @@ public interface CircuitBreaker { * * @param name the name * @param vertx the Vert.x instance - * @param options the configuration option + * @param options the configuration options * @return the created instance */ static CircuitBreaker create(String name, Vertx vertx, CircuitBreakerOptions options) { @@ -60,35 +60,36 @@ static CircuitBreaker create(String name, Vertx vertx) { /** * Closes the circuit breaker. It stops sending events on its state on the event bus. - * This method is not related to the {@code close} state of the circuit breaker. To set the circuit breaker in the - * {@code close} state, use {@link #reset()}. + *

+ * This method is not related to the {@code closed} state of the circuit breaker. To move the circuit breaker to the + * {@code closed} state, use {@link #reset()}. */ @Fluent CircuitBreaker close(); /** - * Sets a {@link Handler} invoked when the circuit breaker state switches to open. + * Sets a {@link Handler} to be invoked when the circuit breaker state switches to open. * * @param handler the handler, must not be {@code null} - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker openHandler(Handler handler); /** - * Sets a {@link Handler} invoked when the circuit breaker state switches to half-open. + * Sets a {@link Handler} to be invoked when the circuit breaker state switches to half-open. * * @param handler the handler, must not be {@code null} - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker halfOpenHandler(Handler handler); /** - * Sets a {@link Handler} invoked when the circuit breaker state switches to close. + * Sets a {@link Handler} to be invoked when the circuit breaker state switches to closed. * * @param handler the handler, must not be {@code null} - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker closeHandler(Handler handler); @@ -97,25 +98,26 @@ static CircuitBreaker create(String name, Vertx vertx) { * Executes the given operation with the circuit breaker control. The operation is generally calling an * external system. The operation receives a {@link Promise} object as parameter and must * call {@link Promise#complete(Object)} when the operation has terminated successfully. The operation must also - * call {@link Promise#fail(Throwable)} in case of failure. + * call {@link Promise#fail(Throwable)} in case of a failure. *

- * The operation is not invoked if the circuit breaker is open, and the given fallback is called immediately. The - * circuit breaker also monitor the completion of the operation before a configure timeout. The operation is - * considered as failed if it does not terminate in time. + * The operation is not invoked if the circuit breaker is open, and the given fallback is called instead. + * The circuit breaker also monitors whether the operation completes in time. The operation is considered failed + * if it does not terminate before the configured timeout. *

* This method returns a {@link Future} object to retrieve the status and result of the operation, with the status * being a success or a failure. If the fallback is called, the returned future is successfully completed with the * value returned from the fallback. If the fallback throws an exception, the returned future is marked as failed. * * @param command the operation - * @param fallback the fallback function. It gets an exception as parameter and returns the fallback result + * @param fallback the fallback function; gets an exception as parameter and returns the fallback result * @param the type of result - * @return a future object completed when the operation or its fallback completes + * @return a future object completed when the operation or the fallback completes */ Future executeWithFallback(Handler> command, Function fallback); /** - * Same as {@link #executeWithFallback(Handler, Function)} but using the circuit breaker default fallback. + * Same as {@link #executeWithFallback(Handler, Function)} but using the circuit breaker + * {@linkplain #fallback(Function) default fallback}. * * @param command the operation * @param the type of result @@ -124,13 +126,13 @@ static CircuitBreaker create(String name, Vertx vertx) { Future execute(Handler> command); /** - * Same as {@link #executeAndReportWithFallback(Promise, Handler, Function)} but using the circuit breaker default - * fallback. + * Same as {@link #executeAndReportWithFallback(Promise, Handler, Function)} but using the circuit breaker + * {@linkplain #fallback(Function) default fallback}. * * @param resultPromise the promise on which the operation result is reported * @param command the operation * @param the type of result - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker executeAndReport(Promise resultPromise, Handler> command); @@ -139,67 +141,68 @@ static CircuitBreaker create(String name, Vertx vertx) { * Executes the given operation with the circuit breaker control. The operation is generally calling an * external system. The operation receives a {@link Promise} object as parameter and must * call {@link Promise#complete(Object)} when the operation has terminated successfully. The operation must also - * call {@link Promise#fail(Throwable)} in case of failure. + * call {@link Promise#fail(Throwable)} in case of a failure. *

- * The operation is not invoked if the circuit breaker is open, and the given fallback is called immediately. The - * circuit breaker also monitor the completion of the operation before a configure timeout. The operation is - * considered as failed if it does not terminate in time. + * The operation is not invoked if the circuit breaker is open, and the given fallback is called instead. + * The circuit breaker also monitors whether the operation completes in time. The operation is considered failed + * if it does not terminate before the configured timeout. *

- * Unlike {@link #executeWithFallback(Handler, Function)}, this method does return a {@link Future} object, but - * let the caller pass a {@link Future} object on which the result is reported. If the fallback is called, the future + * Unlike {@link #executeWithFallback(Handler, Function)}, this method does not return a {@link Future} object, but + * lets the caller pass a {@link Promise} object on which the result is reported. If the fallback is called, the promise * is successfully completed with the value returned by the fallback function. If the fallback throws an exception, - * the future is marked as failed. + * the promise is marked as failed. * * @param resultPromise the promise on which the operation result is reported * @param command the operation - * @param fallback the fallback function. It gets an exception as parameter and returns the fallback result + * @param fallback the fallback function; gets an exception as parameter and returns the fallback result * @param the type of result - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker executeAndReportWithFallback(Promise resultPromise, Handler> command, Function fallback); /** - * Sets a default {@link Function} invoked when the bridge is open to handle the "request", or on failure - * if {@link CircuitBreakerOptions#isFallbackOnFailure()} is enabled. + * Sets a default fallback {@link Function} to be invoked when the circuit breaker is open or when failure + * occurs and {@link CircuitBreakerOptions#isFallbackOnFailure()} is enabled. *

* The function gets the exception as parameter and returns the fallback result. * - * @param handler the handler - * @return the current {@link CircuitBreaker} + * @param handler the fallback handler + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker fallback(Function handler); /** - * Resets the circuit breaker state (number of failure set to 0 and state set to closed). + * Resets the circuit breaker state. The number of recent failures is set to 0 and if the state is half-open, + * it is set to closed. * - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker reset(); /** - * Explicitly opens the circuit. + * Explicitly opens the circuit breaker. * - * @return the current {@link CircuitBreaker} + * @return this {@link CircuitBreaker} */ @Fluent CircuitBreaker open(); /** - * @return the current state. + * @return the current state of this circuit breaker */ CircuitBreakerState state(); /** - * @return the current number of failures. + * @return the current number of recorded failures */ long failureCount(); /** - * @return the name of the circuit breaker. + * @return the name of this circuit breaker */ @CacheReturn String name(); @@ -212,7 +215,7 @@ CircuitBreaker executeAndReportWithFallback(Promise resultPromise, Handle CircuitBreaker retryPolicy(Function retryPolicy); /** - * Set a {@link RetryPolicy} which computes a delay before retry execution. + * Set a {@link RetryPolicy} which computes a delay before a retry attempt. */ @Fluent CircuitBreaker retryPolicy(RetryPolicy retryPolicy); diff --git a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java index 3756679..f5cb8f2 100644 --- a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java +++ b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerOptions.java @@ -20,7 +20,7 @@ import io.vertx.core.json.JsonObject; /** - * Circuit breaker configuration options. All time are given in milliseconds. + * Circuit breaker configuration options. All time values are in milliseconds. * * @author Clement Escoffier */ @@ -30,25 +30,25 @@ public class CircuitBreakerOptions { /** * Default timeout in milliseconds. */ - public static final long DEFAULT_TIMEOUT = 10000L; + public static final long DEFAULT_TIMEOUT = 10_000L; /** - * Default number of failures. + * Default number of failures after which a closed circuit breaker moves to open. */ public static final int DEFAULT_MAX_FAILURES = 5; /** - * Default value of the fallback on failure property. + * Default value of the {@linkplain #isFallbackOnFailure() fallback on failure} property. */ public static final boolean DEFAULT_FALLBACK_ON_FAILURE = false; /** - * Default time before it attempts to re-close the circuit (half-open state) in milliseconds. + * Default time after which an open circuit breaker moves to half-open (in an attempt to re-close) in milliseconds. */ - public static final long DEFAULT_RESET_TIMEOUT = 30000; + public static final long DEFAULT_RESET_TIMEOUT = 30_000; /** - * Whether circuit breaker state should be delivered only to local consumers by default = {@code true}. + * Default value of whether circuit breaker state events should be delivered only to local consumers. */ public static final boolean DEFAULT_NOTIFICATION_LOCAL_ONLY = true; @@ -60,15 +60,15 @@ public class CircuitBreakerOptions { /** * Default notification period in milliseconds. */ - public static final long DEFAULT_NOTIFICATION_PERIOD = 2000; + public static final long DEFAULT_NOTIFICATION_PERIOD = 2_000; /** - * Default rolling window for metrics in milliseconds. + * Default length of rolling window for metrics in milliseconds. */ - public static final long DEFAULT_METRICS_ROLLING_WINDOW = 10000; + public static final long DEFAULT_METRICS_ROLLING_WINDOW = 10_000; /** - * Default number of buckets used for the rolling window. + * Default number of buckets used for the metrics rolling window. */ public static final int DEFAULT_METRICS_ROLLING_BUCKETS = 10; @@ -78,9 +78,9 @@ public class CircuitBreakerOptions { private static final int DEFAULT_MAX_RETRIES = 0; /** - * The default rolling window span in milliseconds. + * Default length of rolling window for failures in milliseconds. */ - private static final int DEFAULT_FAILURES_ROLLING_WINDOW = 10000; + private static final int DEFAULT_FAILURES_ROLLING_WINDOW = 10_000; /** * The operation timeout. @@ -93,7 +93,7 @@ public class CircuitBreakerOptions { private int maxFailures = DEFAULT_MAX_FAILURES; /** - * Whether or not the fallback should be called upon failures. + * Whether the fallback should be called upon failures. */ private boolean fallbackOnFailure = DEFAULT_FALLBACK_ON_FAILURE; @@ -164,9 +164,9 @@ public CircuitBreakerOptions(CircuitBreakerOptions other) { } /** - * Creates a new instance of {@link CircuitBreakerOptions} from the given json object. + * Creates a new instance of {@link CircuitBreakerOptions} from the given JSON object. * - * @param json the json object + * @param json the JSON object */ public CircuitBreakerOptions(JsonObject json) { this(); @@ -174,7 +174,7 @@ public CircuitBreakerOptions(JsonObject json) { } /** - * @return a json object representing the current configuration. + * @return a JSON object representing this configuration */ public JsonObject toJson() { JsonObject json = new JsonObject(); @@ -183,17 +183,17 @@ public JsonObject toJson() { } /** - * @return the maximum number of failures before opening the circuit. + * @return the maximum number of failures before opening the circuit breaker */ public int getMaxFailures() { return maxFailures; } /** - * Sets the maximum number of failures before opening the circuit. + * Sets the maximum number of failures before opening the circuit breaker. * * @param maxFailures the number of failures. - * @return the current {@link CircuitBreakerOptions} instance + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setMaxFailures(int maxFailures) { this.maxFailures = maxFailures; @@ -201,18 +201,18 @@ public CircuitBreakerOptions setMaxFailures(int maxFailures) { } /** - * @return the configured timeout in milliseconds. + * @return the configured timeout in milliseconds */ public long getTimeout() { return timeout; } /** - * Sets the timeout in milliseconds. If an action is not completed before this timeout, the action is considered as + * Sets the timeout in milliseconds. If an action does not complete before this timeout, the action is considered as * a failure. * * @param timeoutInMs the timeout, -1 to disable the timeout - * @return the current {@link CircuitBreakerOptions} instance + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setTimeout(long timeoutInMs) { this.timeout = timeoutInMs; @@ -220,17 +220,17 @@ public CircuitBreakerOptions setTimeout(long timeoutInMs) { } /** - * @return whether or not the fallback is executed on failures, even when the circuit is closed. + * @return whether the fallback is executed on failures, even when the circuit breaker is closed */ public boolean isFallbackOnFailure() { return fallbackOnFailure; } /** - * Sets whether or not the fallback is executed on failure, even when the circuit is closed. + * Sets whether the fallback is executed on failure, even when the circuit breaker is closed. * * @param fallbackOnFailure {@code true} to enable it. - * @return the current {@link CircuitBreakerOptions} instance + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setFallbackOnFailure(boolean fallbackOnFailure) { this.fallbackOnFailure = fallbackOnFailure; @@ -238,18 +238,18 @@ public CircuitBreakerOptions setFallbackOnFailure(boolean fallbackOnFailure) { } /** - * @return the time in milliseconds before it attempts to re-close the circuit (by going to the half-open state). + * @return the time in milliseconds before an open circuit breaker moves to half-open (in an attempt to re-close) */ public long getResetTimeout() { return resetTimeout; } /** - * Sets the time in ms before it attempts to re-close the circuit (by going to the half-open state). If the circuit - * is closed when the timeout is reached, nothing happens. {@code -1} disables this feature. + * Sets the time in milliseconds before an open circuit breaker moves to half-open (in an attempt to re-close). + * If the circuit breaker is closed when the timeout is reached, nothing happens. {@code -1} disables this feature. * * @param resetTimeout the time in ms - * @return the current {@link CircuitBreakerOptions} instance + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setResetTimeout(long resetTimeout) { this.resetTimeout = resetTimeout; @@ -257,17 +257,18 @@ public CircuitBreakerOptions setResetTimeout(long resetTimeout) { } /** - * @return {@code true} if circuit breaker state should be delivered only to local consumers, otherwise {@code false} + * @return {@code true} if circuit breaker state events should be delivered only to local consumers, + * {@code false} otherwise */ public boolean isNotificationLocalOnly() { return notificationLocalOnly; } /** - * Whether circuit breaker state should be delivered only to local consumers. + * Sets whether circuit breaker state events should be delivered only to local consumers. * - * @param notificationLocalOnly {@code true} if circuit breaker state should be delivered only to local consumers, otherwise {@code false} - * @return the current {@link CircuitBreakerOptions} instance + * @param notificationLocalOnly {@code true} if circuit breaker state events should be delivered only to local consumers, {@code false} otherwise + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setNotificationLocalOnly(boolean notificationLocalOnly) { this.notificationLocalOnly = notificationLocalOnly; @@ -275,18 +276,18 @@ public CircuitBreakerOptions setNotificationLocalOnly(boolean notificationLocalO } /** - * @return the eventbus address on which the circuit breaker events are published. {@code null} if this feature has - * been disabled. + * @return the eventbus address on which the circuit breaker events are published, or {@code null} if this feature has + * been disabled */ public String getNotificationAddress() { return notificationAddress; } /** - * Sets the event bus address on which the circuit breaker publish its state change. + * Sets the event bus address on which the circuit breaker publishes its state changes. * - * @param notificationAddress the address, {@code null} to disable this feature. - * @return the current {@link CircuitBreakerOptions} instance + * @param notificationAddress the address, {@code null} to disable this feature + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setNotificationAddress(String notificationAddress) { this.notificationAddress = notificationAddress; @@ -294,18 +295,18 @@ public CircuitBreakerOptions setNotificationAddress(String notificationAddress) } /** - * @return the the period in milliseconds where the circuit breaker send a notification about its state. + * @return the period in milliseconds in which the circuit breaker sends notifications about its state */ public long getNotificationPeriod() { return notificationPeriod; } /** - * Configures the period in milliseconds where the circuit breaker send a notification on the event bus with its + * Sets the period in milliseconds in which the circuit breaker sends notifications on the event bus with its * current state. * * @param notificationPeriod the period, 0 to disable this feature. - * @return the current {@link CircuitBreakerOptions} instance + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setNotificationPeriod(long notificationPeriod) { this.notificationPeriod = notificationPeriod; @@ -313,17 +314,17 @@ public CircuitBreakerOptions setNotificationPeriod(long notificationPeriod) { } /** - * @return the configured rolling window for metrics. + * @return the configured length of rolling window for metrics */ public long getMetricsRollingWindow() { return metricsRollingWindow; } /** - * Sets the rolling window used for metrics. + * Sets the rolling window length used for metrics. * - * @param metricsRollingWindow the period in milliseconds. - * @return the current {@link CircuitBreakerOptions} instance + * @param metricsRollingWindow the period in milliseconds + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setMetricsRollingWindow(long metricsRollingWindow) { this.metricsRollingWindow = metricsRollingWindow; @@ -331,17 +332,17 @@ public CircuitBreakerOptions setMetricsRollingWindow(long metricsRollingWindow) } /** - * @return the configured rolling window for failures. + * @return the configured length of rolling window for failures */ public long getFailuresRollingWindow() { return failuresRollingWindow; } /** - * Sets the rolling window used for metrics. + * Sets the rolling window length used for failures. * - * @param metricsRollingWindow the period in milliseconds. - * @return the current {@link CircuitBreakerOptions} instance + * @param failureRollingWindow the period in milliseconds + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setFailuresRollingWindow(long failureRollingWindow) { this.failuresRollingWindow = failureRollingWindow; @@ -349,21 +350,21 @@ public CircuitBreakerOptions setFailuresRollingWindow(long failureRollingWindow) } /** - * @return the configured number of buckets the rolling window is divided into. + * @return the configured number of buckets the metrics rolling window is divided into */ public int getMetricsRollingBuckets() { return metricsRollingBuckets; } /** - * Sets the configured number of buckets the rolling window is divided into. - * - * The following must be true - metrics.rollingStats.timeInMilliseconds % metrics.rollingStats.numBuckets == 0 - otherwise it will throw an exception. - * - * In other words, 10000/10 is okay, so is 10000/20 but 10000/7 is not. + * Sets the number of buckets the metrics rolling window is divided into. + *

+ * The following must be true: {@code metricsRollingWindow % metricsRollingBuckets == 0}, + * otherwise an exception will be thrown. + * For example, 10000/10 is okay, so is 10000/20, but 10000/7 is not. * - * @param metricsRollingBuckets the number of rolling buckets. - * @return the current {@link CircuitBreakerOptions} instance + * @param metricsRollingBuckets the number of buckets + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setMetricsRollingBuckets(int metricsRollingBuckets) { this.metricsRollingBuckets = metricsRollingBuckets; @@ -371,17 +372,17 @@ public CircuitBreakerOptions setMetricsRollingBuckets(int metricsRollingBuckets) } /** - * @return the number of times the circuit breaker tries to redo the operation before failing + * @return the number of times the circuit breaker retries an operation before failing */ public int getMaxRetries() { return maxRetries; } /** - * Configures the number of times the circuit breaker tries to redo the operation before failing. + * Sets the number of times the circuit breaker retries an operation before failing. * - * @param maxRetries the number of retries, 0 to disable this feature. - * @return the current {@link CircuitBreakerOptions} instance + * @param maxRetries the number of retries, 0 to disable retrying + * @return this {@link CircuitBreakerOptions} */ public CircuitBreakerOptions setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; diff --git a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java index e4de4df..15c48cc 100644 --- a/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java +++ b/src/main/java/io/vertx/circuitbreaker/CircuitBreakerState.java @@ -31,13 +31,13 @@ public enum CircuitBreakerState { */ OPEN, /** - * The {@code CLOSED} state. The circuit breaker lets invocations pass and collects the failures. IF the number of - * failures reach the specified threshold, the cricuit breaker switches to the {@link #OPEN} state. + * The {@code CLOSED} state. The circuit breaker lets invocations pass and collects the failures. If the number of + * failures reach the specified threshold, the circuit breaker switches to the {@link #OPEN} state. */ CLOSED, /** * The {@code HALF_OPEN} state. The circuit breaker has been opened, and is now checking the current situation. It - * lets pass the next invocation and determines from the result (failure or success) if the circuit breaker can + * lets the next invocation pass and determines from the result (failure or success) if the circuit breaker can * be switched to the {@link #CLOSED} state again. */ HALF_OPEN diff --git a/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java b/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java index 4c87d8d..805e884 100644 --- a/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java +++ b/src/main/java/io/vertx/circuitbreaker/RetryPolicy.java @@ -50,7 +50,8 @@ static RetryPolicy linearDelay(long initialDelay, long maxDelay) { /** * Create an exponential delay with jitter retry policy. *

- * Based on Full Jitter in Exponential Backoff And Jitter. + * Based on the Full Jitter approach described in + * Exponential Backoff And Jitter. * * @param initialDelay the initial delay in milliseconds * @param maxDelay maximum delay in milliseconds @@ -68,7 +69,7 @@ static RetryPolicy exponentialDelayWithJitter(long initialDelay, long maxDelay) /** * Compute a delay in milliseconds before retry is executed. * - * @param failure the failure passed to the operation {@link io.vertx.core.Promise} + * @param failure the failure of the previous execution attempt * @param retryCount the number of times operation has been retried already * @return a delay in milliseconds before retry is executed */ diff --git a/src/main/java/io/vertx/circuitbreaker/TimeoutException.java b/src/main/java/io/vertx/circuitbreaker/TimeoutException.java index 1ad8c2a..b294d3b 100644 --- a/src/main/java/io/vertx/circuitbreaker/TimeoutException.java +++ b/src/main/java/io/vertx/circuitbreaker/TimeoutException.java @@ -4,7 +4,7 @@ * Exception reported when the monitored operation timed out. *

* For performance reason, this exception does not carry a stack trace. You are not allowed to set a stack trace or a - * cause to this exception. This immutability allows using a singleton instance. + * cause to this exception. This immutability allows using a singleton instance. * * @author Clement Escoffier */ diff --git a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java index a541e59..f937ae8 100644 --- a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java +++ b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java @@ -133,11 +133,12 @@ public CircuitBreaker fallback(Function handler) { } /** - * A version of reset that can force the the state to `close` even if the circuit breaker is open. This is an - * internal API. + * A version of {@link #reset()} that can forcefully change the state to closed even if the circuit breaker is open. + *

+ * This is an internal API. * - * @param force whether or not we force the state and allow an illegal transition - * @return the current circuit breaker. + * @param force whether we force the state change and allow an illegal transition + * @return this circuit breaker */ public synchronized CircuitBreaker reset(boolean force) { rollingFailures.reset(); From f8ff3aad071dc4616c4ab9dd7fa5dd58b5d3b3c0 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 7 Jul 2023 15:31:53 +0200 Subject: [PATCH 4/6] Use context-bound promises to simplify implementation code --- .../impl/CircuitBreakerImpl.java | 221 ++++++++---------- .../impl/CircuitBreakerMetrics.java | 6 +- 2 files changed, 105 insertions(+), 122 deletions(-) diff --git a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java index f937ae8..27ac7cd 100644 --- a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java +++ b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java @@ -22,12 +22,12 @@ import io.vertx.circuitbreaker.OpenCircuitException; import io.vertx.circuitbreaker.RetryPolicy; import io.vertx.circuitbreaker.TimeoutException; -import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.json.JsonObject; import java.util.Iterator; @@ -208,105 +208,97 @@ private synchronized CircuitBreaker attemptReset() { } @Override - public CircuitBreaker executeAndReportWithFallback( - Promise userFuture, - Handler> command, + public CircuitBreaker executeAndReportWithFallback(Promise resultPromise, Handler> command, Function fallback) { - Context context = vertx.getOrCreateContext(); + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); CircuitBreakerState currentState; synchronized (this) { currentState = state; } - CircuitBreakerMetrics.Operation call = metrics != null ? metrics.enqueue() : null; + CircuitBreakerMetrics.Operation operationMetrics = metrics != null ? metrics.enqueue() : null; // this future object tracks the completion of the operation // This future is marked as failed on operation failures and timeout. - Promise operationResult = Promise.promise(); + Promise operationResult = context.promise(); if (currentState == CircuitBreakerState.CLOSED) { operationResult.future().onComplete(event -> { - context.runOnContext(v -> { - if (event.failed()) { - incrementFailures(); - if (call != null) { - call.failed(); - } - if (options.isFallbackOnFailure()) { - invokeFallback(event.cause(), userFuture, fallback, call); - } else { - userFuture.fail(event.cause()); - } + if (event.failed()) { + incrementFailures(); + if (operationMetrics != null) { + operationMetrics.failed(); + } + if (options.isFallbackOnFailure()) { + invokeFallback(event.cause(), resultPromise, fallback, operationMetrics); } else { - if (call != null) { - call.complete(); - } - reset(); - userFuture.complete(event.result()); + resultPromise.fail(event.cause()); } - // Else the operation has been canceled because of a time out. - }); + } else { + if (operationMetrics != null) { + operationMetrics.complete(); + } + reset(); + resultPromise.complete(event.result()); + } + // Else the operation has been canceled because of a timeout. }); if (options.getMaxRetries() > 0) { - executeOperation(context, command, retryFuture(context, 0, command, operationResult, call), call); + executeOperation(context, command, retryPromise(context, 0, command, operationResult, operationMetrics), operationMetrics); } else { - executeOperation(context, command, operationResult, call); + executeOperation(context, command, operationResult, operationMetrics); } } else if (currentState == CircuitBreakerState.OPEN) { // Fallback immediately - if (call != null) { - call.shortCircuited(); + if (operationMetrics != null) { + operationMetrics.shortCircuited(); } - invokeFallback(OpenCircuitException.INSTANCE, userFuture, fallback, call); + invokeFallback(OpenCircuitException.INSTANCE, resultPromise, fallback, operationMetrics); } else if (currentState == CircuitBreakerState.HALF_OPEN) { if (passed.incrementAndGet() == 1) { operationResult.future().onComplete(event -> { - context.runOnContext(v -> { - if (event.failed()) { - open(); - if (call != null) { - call.failed(); - } - if (options.isFallbackOnFailure()) { - invokeFallback(event.cause(), userFuture, fallback, call); - } else { - userFuture.fail(event.cause()); - } + if (event.failed()) { + open(); + if (operationMetrics != null) { + operationMetrics.failed(); + } + if (options.isFallbackOnFailure()) { + invokeFallback(event.cause(), resultPromise, fallback, operationMetrics); } else { - if (call != null) { - call.complete(); - } - reset(); - userFuture.complete(event.result()); + resultPromise.fail(event.cause()); } - }); + } else { + if (operationMetrics != null) { + operationMetrics.complete(); + } + reset(); + resultPromise.complete(event.result()); + } }); // Execute the operation - executeOperation(context, command, operationResult, call); + executeOperation(context, command, operationResult, operationMetrics); } else { // Not selected, fallback. - if (call != null) { - call.shortCircuited(); + if (operationMetrics != null) { + operationMetrics.shortCircuited(); } - invokeFallback(OpenCircuitException.INSTANCE, userFuture, fallback, call); + invokeFallback(OpenCircuitException.INSTANCE, resultPromise, fallback, operationMetrics); } } return this; } - private Promise retryFuture(Context context, int retryCount, Handler> command, Promise - operationResult, CircuitBreakerMetrics.Operation call) { - Promise retry = Promise.promise(); + private Promise retryPromise(ContextInternal context, int retryCount, Handler> command, + Promise operationResult, CircuitBreakerMetrics.Operation operationMetrics) { - retry.future().onComplete(event -> { + Promise promise = context.promise(); + promise.future().onComplete(event -> { if (event.succeeded()) { reset(); - context.runOnContext(v -> { - operationResult.complete(event.result()); - }); + operationResult.complete(event.result()); return; } @@ -317,32 +309,28 @@ private Promise retryFuture(Context context, int retryCount, Handler { - context.runOnContext(v -> { - // Don't report timeout or error in the retry attempt, only the last one. - executeOperation(context, command, retryFuture(context, retryCount + 1, command, operationResult, null), - call); - }); + executeRetryWithDelay(event.cause(), retryCount, l -> { + // Don't report timeout or error in the retry attempt, only the last one. + executeOperation(context, command, retryPromise(context, retryCount + 1, command, operationResult, null), + operationMetrics); }); } else { - executeRetryWithTimeout(event.cause(), retryCount, (l) -> { - context.runOnContext(v -> { - executeOperation(context, command, operationResult, call); - }); + executeRetryWithDelay(event.cause(), retryCount, l -> { + executeOperation(context, command, operationResult, operationMetrics); }); } } else { - context.runOnContext(v -> operationResult.fail(OpenCircuitException.INSTANCE)); + operationResult.fail(OpenCircuitException.INSTANCE); } }); - return retry; + return promise; } - private void executeRetryWithTimeout(Throwable failure, int retryCount, Handler action) { - long retryTimeout = retryPolicy.delay(failure, retryCount + 1); + private void executeRetryWithDelay(Throwable failure, int retryCount, Handler action) { + long retryDelay = retryPolicy.delay(failure, retryCount + 1); - if (retryTimeout > 0) { - vertx.setTimer(retryTimeout, (l) -> { + if (retryDelay > 0) { + vertx.setTimer(retryDelay, l -> { action.handle(null); }); } else { @@ -350,82 +338,78 @@ private void executeRetryWithTimeout(Throwable failure, int retryCount, Handler< } } - private void invokeFallback(Throwable reason, Promise userFuture, - Function fallback, CircuitBreakerMetrics.Operation operation) { + private void invokeFallback(Throwable reason, Promise resultPromise, + Function fallback, CircuitBreakerMetrics.Operation operationMetrics) { if (fallback == null) { // No fallback, mark the user future as failed. - userFuture.fail(reason); + resultPromise.fail(reason); return; } try { T apply = fallback.apply(reason); - if (operation != null) { - operation.fallbackSucceed(); + if (operationMetrics != null) { + operationMetrics.fallbackSucceed(); } - userFuture.complete(apply); + resultPromise.complete(apply); } catch (Exception e) { - userFuture.fail(e); - if (operation != null) { - operation.fallbackFailed(); + resultPromise.fail(e); + if (operationMetrics != null) { + operationMetrics.fallbackFailed(); } } } - private void executeOperation(Context context, Handler> operation, Promise operationResult, - CircuitBreakerMetrics.Operation call) { + private void executeOperation(ContextInternal context, Handler> operation, Promise operationResult, + CircuitBreakerMetrics.Operation operationMetrics) { // We use an intermediate future to avoid the passed future to complete or fail after a timeout. - Promise passedFuture = Promise.promise(); + Promise passedFuture = context.promise(); // Execute the operation if (options.getTimeout() != -1) { long timerId = vertx.setTimer(options.getTimeout(), (l) -> { - context.runOnContext(v -> { - // Check if the operation has not already been completed - if (!operationResult.future().isComplete()) { - if (call != null) { - call.timeout(); - } - operationResult.fail(TimeoutException.INSTANCE); + // Check if the operation has not already been completed + if (!operationResult.future().isComplete()) { + if (operationMetrics != null) { + operationMetrics.timeout(); } - // Else Operation has completed - }); + operationResult.fail(TimeoutException.INSTANCE); + } + // Else Operation has completed }); passedFuture.future().onComplete(v -> vertx.cancelTimer(timerId)); } try { passedFuture.future().onComplete(ar -> { - context.runOnContext(v -> { - if (ar.failed()) { - if (!operationResult.future().isComplete()) { - operationResult.fail(ar.cause()); - } - } else { - if (!operationResult.future().isComplete()) { - operationResult.complete(ar.result()); - } + if (ar.failed()) { + if (!operationResult.future().isComplete()) { + operationResult.fail(ar.cause()); } - }); + } else { + if (!operationResult.future().isComplete()) { + operationResult.complete(ar.result()); + } + } }); operation.handle(passedFuture); } catch (Throwable e) { - context.runOnContext(v -> { - if (!operationResult.future().isComplete()) { - if (call != null) { - call.error(); - } - operationResult.fail(e); + if (!operationResult.future().isComplete()) { + if (operationMetrics != null) { + operationMetrics.error(); } - }); + operationResult.fail(e); + } } } @Override public Future executeWithFallback(Handler> operation, Function fallback) { - Promise future = Promise.promise(); - executeAndReportWithFallback(future, operation, fallback); - return future.future(); + // be careful to not create a new context, to preserve existing (sometimes synchronous) behavior + ContextInternal context = ContextInternal.current(); + Promise promise = context != null ? context.promise() : Promise.promise(); + executeAndReportWithFallback(promise, operation, fallback); + return promise.future(); } public Future execute(Handler> operation) { @@ -433,8 +417,8 @@ public Future execute(Handler> operation) { } @Override - public CircuitBreaker executeAndReport(Promise resultFuture, Handler> operation) { - return executeAndReportWithFallback(resultFuture, operation, fallback); + public CircuitBreaker executeAndReport(Promise resultPromise, Handler> operation) { + return executeAndReportWithFallback(resultPromise, operation, fallback); } @Override @@ -448,8 +432,7 @@ private synchronized void incrementFailures() { if (state != CircuitBreakerState.OPEN) { open(); } else { - // No need to do it in the previous case, open() do it. - // If open has been called, no need to send update, it will be done by the `open` method. + // `open()` calls `sendUpdateOnEventBus()`, so no need to repeat it in the previous case sendUpdateOnEventBus(); } } else { diff --git a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java index 691f9cf..ce6fb0e 100644 --- a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java +++ b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerMetrics.java @@ -318,9 +318,9 @@ public void add(Summary other) { failures += other.failures; exceptions += other.exceptions; timeouts += other.timeouts; - fallbackSuccess += other.fallbackSuccess ; - fallbackFailure += other.fallbackFailure ; - shortCircuited += other.shortCircuited ; + fallbackSuccess += other.fallbackSuccess; + fallbackFailure += other.fallbackFailure; + shortCircuited += other.shortCircuited; } public void add(Operation operation) { From bba9e7653990b028ca0afcb83ff737378d824c81 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 7 Jul 2023 15:38:51 +0200 Subject: [PATCH 5/6] Fix and improve the circuit breaker rolling window implementation --- .../impl/CircuitBreakerImpl.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java index 27ac7cd..fe29689 100644 --- a/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java +++ b/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java @@ -30,7 +30,6 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.json.JsonObject; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -466,14 +465,22 @@ public CircuitBreaker retryPolicy(RetryPolicy retryPolicy) { return this; } - public static class RollingCounter { + static class RollingCounter { + // all `RollingCounter` methods are called in a `synchronized (CircuitBreakerImpl.this)` block, + // which therefore guards access to these fields + private Map window; private long timeUnitsInWindow; private TimeUnit windowTimeUnit; public RollingCounter(long timeUnitsInWindow, TimeUnit windowTimeUnit) { this.windowTimeUnit = windowTimeUnit; - this.window = new LinkedHashMap<>((int) timeUnitsInWindow + 1); + this.window = new LinkedHashMap((int) timeUnitsInWindow + 1) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > timeUnitsInWindow; + } + }; this.timeUnitsInWindow = timeUnitsInWindow; } @@ -481,18 +488,18 @@ public void increment() { long timeSlot = windowTimeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); Long current = window.getOrDefault(timeSlot, 0L); window.put(timeSlot, ++current); - - if (window.size() > timeUnitsInWindow) { - Iterator iterator = window.keySet().iterator(); - if (iterator.hasNext()) { - window.remove(iterator.next()); - } - } } public long count() { long windowStartTime = windowTimeUnit.convert(System.currentTimeMillis() - windowTimeUnit.toMillis(timeUnitsInWindow), TimeUnit.MILLISECONDS); - return window.entrySet().stream().filter(entry -> entry.getKey() >= windowStartTime).mapToLong(entry -> entry.getValue()).sum(); + + long result = 0; + for (Map.Entry entry : window.entrySet()) { + if (entry.getKey() >= windowStartTime) { + result += entry.getValue(); + } + } + return result; } public void reset() { From 50ffaf48f9ac8c199c5fd64012a0954f6cc3fdad Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Fri, 14 Jul 2023 13:28:27 +0200 Subject: [PATCH 6/6] Add Resilience4j documentation The documentation used to describe how to use Hystrix with Vert.x. Since Hystrix has been obsolete for a long time and the documentation was removed in a recent commit, this commit adds a modern replacement: Resilience4j. The documentation mainly links to the Resilience4j Vert.x how-to that was published recently. --- src/main/asciidoc/index.adoc | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index 259e1aa..f0773d4 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -185,3 +185,19 @@ operation has a chance of succeeding, so it goes into the `half-open` state. In circuit breaker is allowed to execute the guarded operation. Should the call succeed, the circuit breaker resets and returns to the `closed` state, ready for more routine operation. If this trial call fails, however, the circuit breaker returns to the `open` state until another timeout elapses. + +== Using Resilience4j + +link:https://resilience4j.readme.io/[Resilience4j] is a popular library that implements common fault tolerance strategies: + +* bulkhead (concurrency limiter) +* circuit breaker +* rate limiter +* retry +* time limiter (timeout) + +A link:https://how-to.vertx.io/resilience4j-howto/[how-to] has been published that demonstrates the usage of Resilience4j with Vert.x. +The link:https://github.com/vertx-howtos/resilience4j-howto[repository] of that how-to contains Vert.x adapters for all the fault tolerance strategies listed above. +These adapters glue together the Resilience4j API and Vert.x ``Future``s. + +WARNING: Resilience4j 2.0 requires Java 17.