Skip to content

Commit

Permalink
Remove usage of Handler/AsyncResult idiom.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 12, 2024
1 parent 907aee9 commit 56f5571
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ public void getRecords(Handler<AsyncResult<List<Record>>> resultHandler) {
@Override
public void getRecord(String uuid, Handler<AsyncResult<Record>> resultHandler) {
Promise<List<Record>> recordList = Promise.promise();
getRecords(recordList);
getRecords(ar -> {
if (ar.succeeded()) {
recordList.succeed(ar.result());
} else {
recordList.fail(ar.cause());
}
});
recordList.future().map(l -> l.stream().filter(r -> uuid.equals(r.getRegistration())).findFirst().orElse(null)).onComplete(resultHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ public void getRecords(Handler<AsyncResult<List<Record>>> resultHandler) {
List<Future<Record>> futures = new ArrayList<>();
for (String child : children) {
Promise<Record> promise = Promise.promise();
getRecord(child, promise);
getRecord(child, ar -> {
if (ar.succeeded()) {
promise.succeed(ar.result());
} else {
promise.fail(ar.cause());
}
});
futures.add(promise.future());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,17 @@ private Future<Void> retrieveToken(JsonObject conf) {
return result.onSuccess(tk -> this.token = tk).mapEmpty();
}

private void publishRecord(Record record, Handler<AsyncResult<Record>> completionHandler) {
publisher.publish(record).onComplete(ar -> {
private void publishRecord(Record record, Completable<Record> completionHandler) {
publisher.publish(record).onComplete((res, err) -> {
if (completionHandler != null) {
completionHandler.handle(ar);
completionHandler.complete(res, err);
}
if (ar.succeeded()) {
if (err == null) {
LOGGER.info("Kubernetes service published in the vert.x service registry: "
+ record.toJson());
} else {
LOGGER.error("Kubernetes service not published in the vert.x service registry",
ar.cause());
err);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private void sendUnbindEvent(ServiceReference reference) {
}

public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration,
Handler<AsyncResult<Void>> completionHandler) {
Completable<Void> completionHandler) {
JsonObject conf;
if (configuration == null) {
conf = new JsonObject();
Expand All @@ -184,14 +184,14 @@ public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonOb
if (ar.failed()) {
LOGGER.error("Cannot start the service importer " + importer, ar.cause());
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture(ar.cause()));
completionHandler.fail(ar.cause());
}
} else {
importers.add(importer);
LOGGER.info("Service importer " + importer + " started");

if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture(null));
completionHandler.succeed();
}
}
}
Expand All @@ -216,7 +216,7 @@ public Future<Void> registerServiceExporter(ServiceExporter exporter, JsonObject
}

public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonObject configuration,
Handler<AsyncResult<Void>> completionHandler) {
Completable<Void> completionHandler) {
JsonObject conf;
if (configuration == null) {
conf = new JsonObject();
Expand All @@ -230,14 +230,14 @@ public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonOb
if (ar.failed()) {
LOGGER.error("Cannot start the service importer " + exporter, ar.cause());
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture(ar.cause()));
completionHandler.fail(ar.cause());
}
} else {
exporters.add(exporter);
LOGGER.info("Service exporter " + exporter + " started");

if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture(null));
completionHandler.succeed();
}
}
}
Expand Down Expand Up @@ -275,13 +275,13 @@ public void close() {
});
}

public void publish(Record record, Handler<AsyncResult<Record>> resultHandler) {
public void publish(Record record, Completable<Record> resultHandler) {
Status status = record.getStatus() == null || record.getStatus() == Status.UNKNOWN
? Status.UP : record.getStatus();

backend.store(record.setStatus(status), ar -> {
if (ar.failed()) {
resultHandler.handle(Future.failedFuture(ar.cause()));
resultHandler.fail(ar.cause());
return;
}

Expand All @@ -294,7 +294,7 @@ public void publish(Record record, Handler<AsyncResult<Record>> resultHandler) {
.setStatus(status);

vertx.eventBus().publish(announce, announcedRecord.toJson());
resultHandler.handle(Future.succeededFuture(ar.result()));
resultHandler.succeed(ar.result());
});
}

Expand All @@ -305,10 +305,10 @@ public Future<Record> publish(Record record) {
return promise.future();
}

public void unpublish(String id, Handler<AsyncResult<Void>> resultHandler) {
public void unpublish(String id, Completable<Void> resultHandler) {
backend.remove(id, record -> {
if (record.failed()) {
resultHandler.handle(Future.failedFuture(record.cause()));
resultHandler.fail(record.cause());
return;
}

Expand All @@ -322,7 +322,7 @@ public void unpublish(String id, Handler<AsyncResult<Void>> resultHandler) {
.setStatus(Status.DOWN);

vertx.eventBus().publish(announce, announcedRecord.toJson());
resultHandler.handle(Future.succeededFuture());
resultHandler.succeed();
});
}

Expand All @@ -334,7 +334,7 @@ public Future<Void> unpublish(String id) {
}

public void getRecord(JsonObject filter,
Handler<AsyncResult<Record>> resultHandler) {
Completable<Record> resultHandler) {
boolean includeOutOfService = false;
Function<Record, Boolean> accept;
if (filter == null) {
Expand All @@ -354,8 +354,14 @@ public void getRecord(JsonObject filter,
return promise.future();
}

public void getRecord(String id, Handler<AsyncResult<@Nullable Record>> resultHandler) {
backend.getRecord(id, resultHandler);
public void getRecord(String id, Completable<@Nullable Record> resultHandler) {
backend.getRecord(id, ar -> {
if (ar.succeeded()) {
resultHandler.succeed(ar.result());
} else {
resultHandler.fail(ar.cause());
}
});
}

@Override
Expand All @@ -365,7 +371,7 @@ public void getRecord(String id, Handler<AsyncResult<@Nullable Record>> resultHa
return promise.future();
}

public void getRecord(Function<Record, Boolean> filter, Handler<AsyncResult<Record>> resultHandler) {
public void getRecord(Function<Record, Boolean> filter, Completable<Record> resultHandler) {
getRecord(filter, false, resultHandler);
}

Expand All @@ -376,21 +382,21 @@ public void getRecord(Function<Record, Boolean> filter, Handler<AsyncResult<Reco
return promise.future();
}

public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<Record>>
public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Completable<Record>
resultHandler) {
Objects.requireNonNull(filter);
backend.getRecords(list -> {
if (list.failed()) {
resultHandler.handle(Future.failedFuture(list.cause()));
resultHandler.fail(list.cause());
} else {
Optional<Record> any = list.result().stream()
.filter(filter::apply)
.filter(record -> includeOutOfService || record.getStatus() == Status.UP)
.findAny();
if (any.isPresent()) {
resultHandler.handle(Future.succeededFuture(any.get()));
resultHandler.succeed(any.get());
} else {
resultHandler.handle(Future.succeededFuture(null));
resultHandler.succeed();
}
}
});
Expand All @@ -403,7 +409,7 @@ public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfServ
return promise.future();
}

public void getRecords(JsonObject filter, Handler<AsyncResult<List<Record>>> resultHandler) {
public void getRecords(JsonObject filter, Completable<List<Record>> resultHandler) {
boolean includeOutOfService = false;
Function<Record, Boolean> accept;
if (filter == null) {
Expand All @@ -423,7 +429,7 @@ public Future<List<Record>> getRecords(JsonObject filter) {
return promise.future();
}

public void getRecords(Function<Record, Boolean> filter, Handler<AsyncResult<List<Record>>> resultHandler) {
public void getRecords(Function<Record, Boolean> filter, Completable<List<Record>> resultHandler) {
getRecords(filter, false, resultHandler);
}

Expand All @@ -434,18 +440,18 @@ public Future<List<Record>> getRecords(Function<Record, Boolean> filter) {
return promise.future();
}

public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<List<Record>>> resultHandler) {
public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Completable<List<Record>> resultHandler) {
Objects.requireNonNull(filter);
backend.getRecords(list -> {
if (list.failed()) {
resultHandler.handle(Future.failedFuture(list.cause()));
resultHandler.fail(list.cause());
} else {
resultHandler.handle(Future.succeededFuture(
resultHandler.succeed(
list.result().stream()
.filter(filter::apply)
.filter(record -> includeOutOfService || record.getStatus() == Status.UP)
.collect(Collectors.toList())
));
);
}
});
}
Expand All @@ -457,18 +463,18 @@ public Future<List<Record>> getRecords(Function<Record, Boolean> filter, boolean
return promise.future();
}

public void update(Record record, Handler<AsyncResult<Record>> resultHandler) {
public void update(Record record, Completable<Record> resultHandler) {
backend.update(record, ar -> {
if (ar.failed()) {
resultHandler.handle(Future.failedFuture(ar.cause()));
resultHandler.fail(ar.cause());
} else {
for (ServiceExporter exporter : exporters) {
exporter.onUpdate(record);
}

Record announcedRecord = new Record(record);
vertx.eventBus().publish(announce, announcedRecord.toJson());
resultHandler.handle(Future.succeededFuture(record));
resultHandler.succeed(record);
}
});
}
Expand Down

0 comments on commit 56f5571

Please sign in to comment.