Skip to content

Commit

Permalink
Update to executeBlocking API change
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 19, 2023
1 parent 8a050cb commit 70cb293
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,11 @@ private synchronized void ensureConnected(Handler<AsyncResult<Void>> handler) {
case LOST:
case SUSPENDED:
vertx.executeBlocking(
future -> {
try {
if (client.blockUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
future.complete();
} else {
future.fail(new TimeoutException());
}
} catch (Exception e) {
future.fail(e);
() -> {
if (client.blockUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
return null;
} else {
throw new TimeoutException();
}
}).onComplete(ar -> {
if (ar.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,8 @@ public void start(Vertx vertx, ServicePublisher publisher, JsonObject configurat
}

synchronized void scan(Promise<Void> completion) {
vertx.<List<Container>>executeBlocking(
future -> {
try {
future.complete(client.listContainersCmd().withStatusFilter(Collections.singletonList("running")).exec());
} catch (Exception e) {
future.fail(e);
}
}).onComplete(ar -> {
vertx.executeBlocking(
() -> client.listContainersCmd().withStatusFilter(Collections.singletonList("running")).exec()).onComplete(ar -> {
if (ar.failed()) {
if (completion != null) {
completion.fail(ar.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,29 @@ public void start(Vertx vertx, ServicePublisher publisher, JsonObject configurat
int connectionTimeoutMs = configuration.getInteger("connectionTimeoutMs", 1000);

vertx.<Void>executeBlocking(
f -> {
try {

client = CuratorFrameworkFactory.builder()
.canBeReadOnly(canBeReadOnly)
.connectString(connection)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(baseGraceBetweenRetries, maxRetries))
.build();
client.start();

discovery = ServiceDiscoveryBuilder.builder(JsonObject.class)
.client(client)
.basePath(basePath)
.serializer(new JsonObjectSerializer())
.watchInstances(true)
.build();

discovery.start();

cache = TreeCache.newBuilder(client, basePath).build();
cache.start();
cache.getListenable().addListener(this);

f.complete();
} catch (Exception e) {
future.fail(e);
}
() -> {
client = CuratorFrameworkFactory.builder()
.canBeReadOnly(canBeReadOnly)
.connectString(connection)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(baseGraceBetweenRetries, maxRetries))
.build();
client.start();

discovery = ServiceDiscoveryBuilder.builder(JsonObject.class)
.client(client)
.basePath(basePath)
.serializer(new JsonObjectSerializer())
.watchInstances(true)
.build();

discovery.start();

cache = TreeCache.newBuilder(client, basePath).build();
cache.start();
cache.getListenable().addListener(this);

return null;
}).onComplete(
ar -> {
if (ar.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,9 @@ public void testServiceArrival(TestContext tc) throws Exception {
sd.getRecords(x -> true).onComplete(tc.asyncAssertSuccess(l -> {
tc.assertTrue(l.size() == 0);

vertx.executeBlocking(future -> {
try {
this.discovery.registerService(instance);
future.complete();
} catch (Exception e) {
future.fail(e);
}
vertx.executeBlocking(() -> {
this.discovery.registerService(instance);
return null;
}).onComplete(tc.asyncAssertSuccess(v2 -> {
waitUntil(() -> serviceLookup(sd, 1), tc.asyncAssertSuccess(v3 -> {
async.complete();
Expand Down Expand Up @@ -165,32 +161,20 @@ public void testArrivalDepartureAndComeBack(TestContext tc) throws Exception {
tc.asyncAssertSuccess(v1 -> {
sd.getRecords(x -> true).onComplete(tc.asyncAssertSuccess(l -> {
tc.assertTrue(l.size() == 0);
vertx.executeBlocking(future -> {
try {
this.discovery.registerService(instance);
future.complete();
} catch (Exception e) {
future.fail(e);
}
vertx.executeBlocking(() -> {
this.discovery.registerService(instance);
return null;
}).onComplete(tc.asyncAssertSuccess(v2 -> {
waitUntil(() -> serviceLookup(sd, 1), tc.asyncAssertSuccess(v3 -> {
// Leave
vertx.executeBlocking(future2 -> {
try {
this.discovery.unregisterService(instance);
future2.complete();
} catch (Exception e) {
future2.fail(e);
}
vertx.executeBlocking(() -> {
this.discovery.unregisterService(instance);
return null;
}).onComplete(tc.asyncAssertSuccess(v4 -> {
waitUntil(() -> serviceLookup(sd, 0), tc.asyncAssertSuccess(v5 -> {
vertx.executeBlocking(future3 -> {
try {
this.discovery.registerService(instance);
future3.complete();
} catch (Exception e) {
future3.fail(e);
}
vertx.executeBlocking(() -> {
this.discovery.registerService(instance);
return null;
}).onComplete(ar3 -> waitUntil(() -> serviceLookup(sd, 1), tc.asyncAssertSuccess(v6 -> {
async.complete();
})));
Expand Down Expand Up @@ -247,13 +231,9 @@ public void testServiceArrivalWithSameName(TestContext tc) throws Exception {
tc.asyncAssertSuccess(v -> {
waitUntil(() -> serviceLookup(sd, 1), tc.asyncAssertSuccess(list -> {
tc.assertEquals(list.get(0).getName(), "foo-service");
vertx.executeBlocking(promise -> {
try {
this.discovery.registerService(instance2);
promise.complete();
} catch (Exception e) {
promise.fail(e);
}
vertx.executeBlocking(() -> {
this.discovery.registerService(instance2);
return null;
}).onComplete(tc.asyncAssertSuccess(v2 -> {
waitUntil(() -> serviceLookup(sd, 2), tc.asyncAssertSuccess(lookup -> {
tc.assertEquals(lookup.get(0).getName(), "foo-service");
Expand Down
2 changes: 1 addition & 1 deletion vertx-service-discovery/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ The second parameter can provide an optional configuration for the bridge.

When the bridge is registered the {@link io.vertx.servicediscovery.spi.ServiceImporter#start} method is called.
It lets you configure the bridge. When the bridge is configured, ready and has imported / exported the initial services, it must complete the given {@link io.vertx.core.Future}.
If the bridge starts method is blocking, it must use an {@link io.vertx.core.Vertx#executeBlocking(io.vertx.core.Handler, boolean)} construct, and complete the given future object.
If the bridge starts method is blocking, it must use an {@link io.vertx.core.Vertx#executeBlocking(java.util.concurrent.Callable, boolean)} construct, and complete the given future object.

When the service discovery is stopped, the bridge is stopped.
The {@link io.vertx.servicediscovery.spi.ServiceImporter#close(io.vertx.core.Handler)} method is called that provides the opportunity to cleanup resources, removed imported / exported services...
Expand Down

0 comments on commit 70cb293

Please sign in to comment.