From 56f5571bbaae014cf5f8fddc6d3ab463790837f6 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 12 Sep 2024 15:55:22 +0200 Subject: [PATCH] Remove usage of Handler/AsyncResult idiom. --- .../backend/consul/ConsulBackendService.java | 8 ++- .../zookeeper/ZookeeperBackendService.java | 8 ++- .../kubernetes/KubernetesServiceImporter.java | 10 +-- .../servicediscovery/impl/DiscoveryImpl.java | 64 ++++++++++--------- 4 files changed, 54 insertions(+), 36 deletions(-) diff --git a/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java b/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java index b24f98df..47ce3443 100644 --- a/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java +++ b/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java @@ -131,7 +131,13 @@ public void getRecords(Handler>> resultHandler) { @Override public void getRecord(String uuid, Handler> resultHandler) { Promise> recordList = Promise.promise(); - getRecords(recordList); + getRecords(ar -> { + if (ar.succeeded()) { + recordList.succeed(ar.result()); + } else { + recordList.fail(ar.cause()); + } + }); recordList.future().map(l -> l.stream().filter(r -> uuid.equals(r.getRegistration())).findFirst().orElse(null)).onComplete(resultHandler); } diff --git a/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java b/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java index 359b5aff..1a0c1d81 100644 --- a/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java +++ b/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java @@ -228,7 +228,13 @@ public void getRecords(Handler>> resultHandler) { List> futures = new ArrayList<>(); for (String child : children) { Promise promise = Promise.promise(); - getRecord(child, promise); + getRecord(child, ar -> { + if (ar.succeeded()) { + promise.succeed(ar.result()); + } else { + promise.fail(ar.cause()); + } + }); futures.add(promise.future()); } diff --git a/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java b/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java index ffc3d546..f3735bb4 100644 --- a/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java +++ b/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java @@ -334,17 +334,17 @@ private Future retrieveToken(JsonObject conf) { return result.onSuccess(tk -> this.token = tk).mapEmpty(); } - private void publishRecord(Record record, Handler> completionHandler) { - publisher.publish(record).onComplete(ar -> { + private void publishRecord(Record record, Completable completionHandler) { + publisher.publish(record).onComplete((res, err) -> { if (completionHandler != null) { - completionHandler.handle(ar); + completionHandler.complete(res, err); } - if (ar.succeeded()) { + if (err == null) { LOGGER.info("Kubernetes service published in the vert.x service registry: " + record.toJson()); } else { LOGGER.error("Kubernetes service not published in the vert.x service registry", - ar.cause()); + err); } }); } diff --git a/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java b/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java index 59485ab7..acc10fb0 100644 --- a/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java +++ b/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java @@ -170,7 +170,7 @@ private void sendUnbindEvent(ServiceReference reference) { } public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration, - Handler> completionHandler) { + Completable completionHandler) { JsonObject conf; if (configuration == null) { conf = new JsonObject(); @@ -184,14 +184,14 @@ public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonOb if (ar.failed()) { LOGGER.error("Cannot start the service importer " + importer, ar.cause()); if (completionHandler != null) { - completionHandler.handle(Future.failedFuture(ar.cause())); + completionHandler.fail(ar.cause()); } } else { importers.add(importer); LOGGER.info("Service importer " + importer + " started"); if (completionHandler != null) { - completionHandler.handle(Future.succeededFuture(null)); + completionHandler.succeed(); } } } @@ -216,7 +216,7 @@ public Future registerServiceExporter(ServiceExporter exporter, JsonObject } public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonObject configuration, - Handler> completionHandler) { + Completable completionHandler) { JsonObject conf; if (configuration == null) { conf = new JsonObject(); @@ -230,14 +230,14 @@ public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonOb if (ar.failed()) { LOGGER.error("Cannot start the service importer " + exporter, ar.cause()); if (completionHandler != null) { - completionHandler.handle(Future.failedFuture(ar.cause())); + completionHandler.fail(ar.cause()); } } else { exporters.add(exporter); LOGGER.info("Service exporter " + exporter + " started"); if (completionHandler != null) { - completionHandler.handle(Future.succeededFuture(null)); + completionHandler.succeed(); } } } @@ -275,13 +275,13 @@ public void close() { }); } - public void publish(Record record, Handler> resultHandler) { + public void publish(Record record, Completable resultHandler) { Status status = record.getStatus() == null || record.getStatus() == Status.UNKNOWN ? Status.UP : record.getStatus(); backend.store(record.setStatus(status), ar -> { if (ar.failed()) { - resultHandler.handle(Future.failedFuture(ar.cause())); + resultHandler.fail(ar.cause()); return; } @@ -294,7 +294,7 @@ public void publish(Record record, Handler> resultHandler) { .setStatus(status); vertx.eventBus().publish(announce, announcedRecord.toJson()); - resultHandler.handle(Future.succeededFuture(ar.result())); + resultHandler.succeed(ar.result()); }); } @@ -305,10 +305,10 @@ public Future publish(Record record) { return promise.future(); } - public void unpublish(String id, Handler> resultHandler) { + public void unpublish(String id, Completable resultHandler) { backend.remove(id, record -> { if (record.failed()) { - resultHandler.handle(Future.failedFuture(record.cause())); + resultHandler.fail(record.cause()); return; } @@ -322,7 +322,7 @@ public void unpublish(String id, Handler> resultHandler) { .setStatus(Status.DOWN); vertx.eventBus().publish(announce, announcedRecord.toJson()); - resultHandler.handle(Future.succeededFuture()); + resultHandler.succeed(); }); } @@ -334,7 +334,7 @@ public Future unpublish(String id) { } public void getRecord(JsonObject filter, - Handler> resultHandler) { + Completable resultHandler) { boolean includeOutOfService = false; Function accept; if (filter == null) { @@ -354,8 +354,14 @@ public void getRecord(JsonObject filter, return promise.future(); } - public void getRecord(String id, Handler> resultHandler) { - backend.getRecord(id, resultHandler); + public void getRecord(String id, Completable<@Nullable Record> resultHandler) { + backend.getRecord(id, ar -> { + if (ar.succeeded()) { + resultHandler.succeed(ar.result()); + } else { + resultHandler.fail(ar.cause()); + } + }); } @Override @@ -365,7 +371,7 @@ public void getRecord(String id, Handler> resultHa return promise.future(); } - public void getRecord(Function filter, Handler> resultHandler) { + public void getRecord(Function filter, Completable resultHandler) { getRecord(filter, false, resultHandler); } @@ -376,21 +382,21 @@ public void getRecord(Function filter, Handler filter, boolean includeOutOfService, Handler> + public void getRecord(Function filter, boolean includeOutOfService, Completable resultHandler) { Objects.requireNonNull(filter); backend.getRecords(list -> { if (list.failed()) { - resultHandler.handle(Future.failedFuture(list.cause())); + resultHandler.fail(list.cause()); } else { Optional any = list.result().stream() .filter(filter::apply) .filter(record -> includeOutOfService || record.getStatus() == Status.UP) .findAny(); if (any.isPresent()) { - resultHandler.handle(Future.succeededFuture(any.get())); + resultHandler.succeed(any.get()); } else { - resultHandler.handle(Future.succeededFuture(null)); + resultHandler.succeed(); } } }); @@ -403,7 +409,7 @@ public void getRecord(Function filter, boolean includeOutOfServ return promise.future(); } - public void getRecords(JsonObject filter, Handler>> resultHandler) { + public void getRecords(JsonObject filter, Completable> resultHandler) { boolean includeOutOfService = false; Function accept; if (filter == null) { @@ -423,7 +429,7 @@ public Future> getRecords(JsonObject filter) { return promise.future(); } - public void getRecords(Function filter, Handler>> resultHandler) { + public void getRecords(Function filter, Completable> resultHandler) { getRecords(filter, false, resultHandler); } @@ -434,18 +440,18 @@ public Future> getRecords(Function filter) { return promise.future(); } - public void getRecords(Function filter, boolean includeOutOfService, Handler>> resultHandler) { + public void getRecords(Function filter, boolean includeOutOfService, Completable> resultHandler) { Objects.requireNonNull(filter); backend.getRecords(list -> { if (list.failed()) { - resultHandler.handle(Future.failedFuture(list.cause())); + resultHandler.fail(list.cause()); } else { - resultHandler.handle(Future.succeededFuture( + resultHandler.succeed( list.result().stream() .filter(filter::apply) .filter(record -> includeOutOfService || record.getStatus() == Status.UP) .collect(Collectors.toList()) - )); + ); } }); } @@ -457,10 +463,10 @@ public Future> getRecords(Function filter, boolean return promise.future(); } - public void update(Record record, Handler> resultHandler) { + public void update(Record record, Completable resultHandler) { backend.update(record, ar -> { if (ar.failed()) { - resultHandler.handle(Future.failedFuture(ar.cause())); + resultHandler.fail(ar.cause()); } else { for (ServiceExporter exporter : exporters) { exporter.onUpdate(record); @@ -468,7 +474,7 @@ public void update(Record record, Handler> resultHandler) { Record announcedRecord = new Record(record); vertx.eventBus().publish(announce, announcedRecord.toJson()); - resultHandler.handle(Future.succeededFuture(record)); + resultHandler.succeed(record); } }); }