diff --git a/src/main/java/iudx/catalogue/server/mlayer/vocabulary/DataModel.java b/src/main/java/iudx/catalogue/server/mlayer/vocabulary/DataModel.java index 0b737158..6f217b2f 100644 --- a/src/main/java/iudx/catalogue/server/mlayer/vocabulary/DataModel.java +++ b/src/main/java/iudx/catalogue/server/mlayer/vocabulary/DataModel.java @@ -11,16 +11,20 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; import iudx.catalogue.server.database.ElasticClient; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class DataModel { private static final Logger LOGGER = LogManager.getLogger(DataModel.class); + private static final int MAX_CONCURRENT_REQUESTS = 10; // limit to 10 concurrent requests private final ElasticClient client; private final WebClient webClient; private final String docIndex; + private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS); /** * Constructor for DataModel. @@ -29,7 +33,11 @@ public class DataModel { * @param docIndex The index name where data are stored/retrieved in elastic. */ public DataModel(ElasticClient client, String docIndex) { - this.webClient = WebClient.create(Vertx.vertx()); + WebClientOptions options = new WebClientOptions() + .setMaxPoolSize(10) + .setKeepAlive(true) + .setConnectTimeout(5000); + this.webClient = WebClient.create(Vertx.vertx(), options); this.client = client; this.docIndex = docIndex; } @@ -121,16 +129,36 @@ private Future fetchDataModels(JsonArray results) { String classId = type.split(":")[1]; String dmUrl = contextUrl + classId + ".jsonld"; + acquireSemaphoreAndFetchDataModel( + id, classId, dmUrl, classIdToSubClassMap, pendingRequests, promise); + } + return promise.future(); + } + + private void acquireSemaphoreAndFetchDataModel( + String id, + String classId, + String dmUrl, + JsonObject classIdToSubClassMap, + AtomicInteger pendingRequests, + Promise promise) { + try { + semaphore.acquire(); webClient .getAbs(dmUrl) .send( dmAr -> { handleDataModelResponse( dmAr, id, classId, classIdToSubClassMap, pendingRequests, promise, dmUrl); + semaphore.release(); }); + } catch (InterruptedException e) { + LOGGER.error("Semaphore acquisition interrupted for URL: {}", dmUrl, e); + semaphore.release(); + if (pendingRequests.decrementAndGet() == 0) { + promise.complete(classIdToSubClassMap); + } } - this.webClient.close(); - return promise.future(); } /**