Skip to content

Commit

Permalink
Merge pull request #374 from Kranthi-Guribilli/bug-fix
Browse files Browse the repository at this point in the history
Bug fix: To Resolve Concurrent Cancellation Exception from Mlayer datasets API
  • Loading branch information
pranavrd authored Jul 26, 2024
2 parents 68b8046 + 157b6e9 commit 38689f2
Showing 1 changed file with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import iudx.catalogue.server.database.ElasticClient;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DataModel {
private static final Logger LOGGER = LogManager.getLogger(DataModel.class);
private static final int MAX_CONCURRENT_REQUESTS = 10; // limit to 10 concurrent requests
private final ElasticClient client;
private final WebClient webClient;
private final String docIndex;
private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS);

/**
* Constructor for DataModel.
Expand All @@ -29,7 +33,11 @@ public class DataModel {
* @param docIndex The index name where data are stored/retrieved in elastic.
*/
public DataModel(ElasticClient client, String docIndex) {
this.webClient = WebClient.create(Vertx.vertx());
WebClientOptions options = new WebClientOptions()
.setMaxPoolSize(10)
.setKeepAlive(true)
.setConnectTimeout(5000);
this.webClient = WebClient.create(Vertx.vertx(), options);
this.client = client;
this.docIndex = docIndex;
}
Expand Down Expand Up @@ -121,16 +129,36 @@ private Future<JsonObject> fetchDataModels(JsonArray results) {
String classId = type.split(":")[1];
String dmUrl = contextUrl + classId + ".jsonld";

acquireSemaphoreAndFetchDataModel(
id, classId, dmUrl, classIdToSubClassMap, pendingRequests, promise);
}
return promise.future();
}

private void acquireSemaphoreAndFetchDataModel(
String id,
String classId,
String dmUrl,
JsonObject classIdToSubClassMap,
AtomicInteger pendingRequests,
Promise<JsonObject> promise) {
try {
semaphore.acquire();
webClient
.getAbs(dmUrl)
.send(
dmAr -> {
handleDataModelResponse(
dmAr, id, classId, classIdToSubClassMap, pendingRequests, promise, dmUrl);
semaphore.release();
});
} catch (InterruptedException e) {
LOGGER.error("Semaphore acquisition interrupted for URL: {}", dmUrl, e);
semaphore.release();
if (pendingRequests.decrementAndGet() == 0) {
promise.complete(classIdToSubClassMap);
}
}
this.webClient.close();
return promise.future();
}

/**
Expand Down

0 comments on commit 38689f2

Please sign in to comment.