Skip to content

Commit

Permalink
Merge pull request #379 from Kranthi-Guribilli/memory_leakage-fix
Browse files Browse the repository at this point in the history
Fix: Concurrent Cancellation Exception
  • Loading branch information
gopal-mahajan authored Aug 19, 2024
2 parents afc241d + ba0965b commit 892e378
Show file tree
Hide file tree
Showing 8 changed files with 1,115 additions and 888 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.core.*;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import iudx.catalogue.server.database.mlayer.*;
import iudx.catalogue.server.geocoding.GeocodingService;
import iudx.catalogue.server.nlpsearch.NLPSearchService;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class DatabaseServiceImpl implements DatabaseService {
private String ratingIndex;
private String mlayerInstanceIndex;
private String mlayerDomainIndex;
private WebClient webClient;

/**
* Constructs a new DatabaseServiceImpl instance with the given ElasticClient and index names.
Expand All @@ -56,12 +58,14 @@ public class DatabaseServiceImpl implements DatabaseService {
* @param mlayerDomainIndex the name of the index used for ML layer domain storage
*/
public DatabaseServiceImpl(
WebClient webClient,
ElasticClient client,
String docIndex,
String ratingIndex,
String mlayerInstanceIndex,
String mlayerDomainIndex) {
this(client);
this.webClient = webClient;
this.docIndex = docIndex;
this.ratingIndex = ratingIndex;
this.mlayerInstanceIndex = mlayerInstanceIndex;
Expand All @@ -80,6 +84,7 @@ public DatabaseServiceImpl(
* @param geoService the geocoding service used to perform geocoding operations
*/
public DatabaseServiceImpl(
WebClient webClient,
ElasticClient client,
String docIndex,
String ratingIndex,
Expand All @@ -88,6 +93,7 @@ public DatabaseServiceImpl(
NLPSearchService nlpService,
GeocodingService geoService) {
this(client, nlpService, geoService);
this.webClient = webClient;
this.docIndex = docIndex;
this.ratingIndex = ratingIndex;
this.mlayerInstanceIndex = mlayerInstanceIndex;
Expand Down Expand Up @@ -1356,7 +1362,8 @@ public DatabaseService getMlayerGeoQuery(
@Override
public DatabaseService getMlayerAllDatasets(
JsonObject requestParam, String query, Handler<AsyncResult<JsonObject>> handler) {
MlayerDataset mlayerDataset = new MlayerDataset(client, docIndex, mlayerInstanceIndex);
MlayerDataset mlayerDataset =
new MlayerDataset(webClient, client, docIndex, mlayerInstanceIndex);
mlayerDataset.getMlayerAllDatasets(requestParam, query, handler);
return this;
}
Expand Down Expand Up @@ -1392,7 +1399,8 @@ Future<Boolean> verifyInstance(String instanceId) {
return promise.future();
}

String checkInstance = GET_INSTANCE_CASE_INSENSITIVE_QUERY.replace("$1", instanceId).replace("$2", "");
String checkInstance =
GET_INSTANCE_CASE_INSENSITIVE_QUERY.replace("$1", instanceId).replace("$2", "");
client.searchAsync(
checkInstance,
docIndex,
Expand Down
33 changes: 29 additions & 4 deletions src/main/java/iudx/catalogue/server/database/DatabaseVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import static iudx.catalogue.server.util.Constants.*;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.serviceproxy.ServiceBinder;
import iudx.catalogue.server.geocoding.GeocodingService;
import iudx.catalogue.server.nlpsearch.NLPSearchService;


/**
* The Database Verticle.
*
Expand Down Expand Up @@ -38,6 +40,17 @@ public class DatabaseVerticle extends AbstractVerticle {
private ServiceBinder binder;
private MessageConsumer<JsonObject> consumer;

/**
* Helper function to create a WebClient to talk to the vocabulary server.
*
* @param vertx the vertx instance
* @return a web client initialized with the relevant client certificate
*/
static WebClient createWebClient(Vertx vertx) {
WebClientOptions webClientOptions = new WebClientOptions();
return WebClient.create(vertx, webClientOptions);
}

/**
* This method is used to start the Verticle. It deploys a verticle in a cluster, registers the
* service with the Event bus against an address, publishes the service with the service discovery
Expand Down Expand Up @@ -66,10 +79,22 @@ public void start() throws Exception {
GeocodingService geoService = GeocodingService.createProxy(vertx, GEOCODING_SERVICE_ADDRESS);
database =
new DatabaseServiceImpl(
client, docIndex, ratingIndex, mlayerIndex, mlayerDomainIndex,
nlpService, geoService);
createWebClient(vertx),
client,
docIndex,
ratingIndex,
mlayerIndex,
mlayerDomainIndex,
nlpService,
geoService);
} else {
database = new DatabaseServiceImpl(client, docIndex, ratingIndex, mlayerIndex,
database =
new DatabaseServiceImpl(
createWebClient(vertx),
client,
docIndex,
ratingIndex,
mlayerIndex,
mlayerDomainIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.vertx.core.*;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import iudx.catalogue.server.database.ElasticClient;
import iudx.catalogue.server.database.RespBuilder;
import iudx.catalogue.server.database.Util;
Expand All @@ -26,9 +27,18 @@ public class MlayerDataset {
.withDetail(DETAIL_INTERNAL_SERVER_ERROR)
.getResponse();
ElasticClient client;
WebClient webClient;
String docIndex;
String mlayerInstanceIndex;

public MlayerDataset(
WebClient webClient, ElasticClient client, String docIndex, String mlayerInstanceIndex) {
this.webClient = webClient;
this.client = client;
this.docIndex = docIndex;
this.mlayerInstanceIndex = mlayerInstanceIndex;
}

public MlayerDataset(ElasticClient client, String docIndex, String mlayerInstanceIndex) {
this.client = client;
this.docIndex = docIndex;
Expand Down Expand Up @@ -137,9 +147,7 @@ public void getMlayerDataset(JsonObject requestData, Handler<AsyncResult<JsonObj
}

public void getMlayerAllDatasets(
JsonObject requestParam,
String query,
Handler<AsyncResult<JsonObject>> handler) {
JsonObject requestParam, String query, Handler<AsyncResult<JsonObject>> handler) {

LOGGER.debug("Getting all the resource group items");
Promise<JsonObject> datasetResult = Promise.promise();
Expand All @@ -154,8 +162,8 @@ public void getMlayerAllDatasets(
.onComplete(
ar -> {
if (ar.succeeded()) {
DataModel dataModel = new DataModel(client, docIndex);
dataModel
DataModel domainInfoFetcher = new DataModel(webClient, client, docIndex);
domainInfoFetcher
.getDataModelInfo()
.onComplete(
domainInfoResult -> {
Expand Down Expand Up @@ -264,8 +272,7 @@ private void gettingAllDatasets(String query, Promise<JsonObject> datasetResult)
int size = resultHandler.result().getJsonArray(RESULTS).size();
if (size == 0) {
LOGGER.debug("getRGs is zero");
datasetResult.handle(
Future.failedFuture(NO_CONTENT_AVAILABLE));
datasetResult.handle(Future.failedFuture(NO_CONTENT_AVAILABLE));
return;
}
JsonObject rsUrl = new JsonObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import iudx.catalogue.server.database.ElasticClient;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -32,12 +34,8 @@ public class DataModel {
* @param client The ElasticClient instance
* @param docIndex The index name where data are stored/retrieved in elastic.
*/
public DataModel(ElasticClient client, String docIndex) {
WebClientOptions options = new WebClientOptions()
.setMaxPoolSize(10)
.setKeepAlive(true)
.setConnectTimeout(5000);
this.webClient = WebClient.create(Vertx.vertx(), options);
public DataModel(WebClient webClient, ElasticClient client, String docIndex) {
this.webClient = webClient;
this.client = client;
this.docIndex = docIndex;
}
Expand Down Expand Up @@ -98,48 +96,55 @@ private Future<JsonArray> getAllDatasetsByRsGrp() {
* Fetches data models asynchronously.
*
* @param results The JsonArray of results from Elasticsearch search.
* @return Future containing JsonObject with class to subclass mappings.
* @return Future containing JsonObject with id to subclass mappings.
*/
private Future<JsonObject> fetchDataModels(JsonArray results) {
Promise<JsonObject> promise = Promise.promise();
JsonObject classIdToSubClassMap = new JsonObject();
JsonObject idToSubClassMap = new JsonObject();

if (results.isEmpty()) {
promise.complete(classIdToSubClassMap);
promise.complete(idToSubClassMap);
return promise.future();
}

AtomicInteger pendingRequests = new AtomicInteger(results.size());
Set<String> uniqueClassIds = new HashSet<>();
String contextUrl = results.getJsonObject(0).getString("@context");

Map<String, String> idToClassIdMap = new HashMap<>();
for (int i = 0; i < results.size(); i++) {
JsonObject result = results.getJsonObject(i);
String id = result.getString("id");
JsonArray typeArray = result.getJsonArray("type");

if (typeArray == null || typeArray.size() < 2) {
LOGGER.error("Invalid type array in result: {}", result.encode());
if (pendingRequests.decrementAndGet() == 0) {
promise.complete(classIdToSubClassMap);
}
continue;
}

String id = result.getString("id");
String type = typeArray.getString(1);
String classId = type.split(":")[1];
String dmUrl = contextUrl + classId + ".jsonld";
uniqueClassIds.add(classId);
idToClassIdMap.put(id, classId);
}

AtomicInteger pendingRequests = new AtomicInteger(uniqueClassIds.size());
if (uniqueClassIds.isEmpty()) {
promise.complete(idToSubClassMap);
return promise.future();
}
for (String classId : uniqueClassIds) {
String dmUrl = contextUrl + classId + ".jsonld";
acquireSemaphoreAndFetchDataModel(
id, classId, dmUrl, classIdToSubClassMap, pendingRequests, promise);
classId, dmUrl, idToClassIdMap, idToSubClassMap, pendingRequests, promise);
}

return promise.future();
}

private void acquireSemaphoreAndFetchDataModel(
String id,
String classId,
String dmUrl,
JsonObject classIdToSubClassMap,
Map<String, String> idToClassIdMap,
JsonObject idToSubClassMap,
AtomicInteger pendingRequests,
Promise<JsonObject> promise) {
try {
Expand All @@ -149,14 +154,20 @@ private void acquireSemaphoreAndFetchDataModel(
.send(
dmAr -> {
handleDataModelResponse(
dmAr, id, classId, classIdToSubClassMap, pendingRequests, promise, dmUrl);
dmAr,
classId,
idToClassIdMap,
idToSubClassMap,
pendingRequests,
promise,
dmUrl);
semaphore.release();
});
} catch (InterruptedException e) {
LOGGER.error("Semaphore acquisition interrupted for URL: {}", dmUrl, e);
semaphore.release();
if (pendingRequests.decrementAndGet() == 0) {
promise.complete(classIdToSubClassMap);
promise.complete(idToSubClassMap);
}
}
}
Expand All @@ -165,18 +176,18 @@ private void acquireSemaphoreAndFetchDataModel(
* Handles the response from fetching data model information.
*
* @param dmAr The async result of the HTTP response.
* @param id The id of the data model.
* @param classId The class id of the data model.
* @param classIdToSubClassMap The JsonObject mapping class to subclass.
* @param idToClassIdMap The map of id to classId.
* @param idToSubClassMap The JsonObject mapping id to subclass.
* @param pendingRequests The AtomicInteger tracking pending requests.
* @param promise The Promise to complete with classIdToSubClassMap.
* @param promise The Promise to complete with idToSubClassMap.
* @param dmUrl The URL of the data model.
*/
private void handleDataModelResponse(
void handleDataModelResponse(
AsyncResult<HttpResponse<Buffer>> dmAr,
String id,
String classId,
JsonObject classIdToSubClassMap,
Map<String, String> idToClassIdMap,
JsonObject idToSubClassMap,
AtomicInteger pendingRequests,
Promise<JsonObject> promise,
String dmUrl) {
Expand Down Expand Up @@ -210,7 +221,11 @@ private void handleDataModelResponse(
String subClassIdStr = subClassOfObj.getString("@id");
if (subClassIdStr != null && subClassIdStr.contains(":")) {
String subClassId = subClassIdStr.split(":")[1];
classIdToSubClassMap.put(id, subClassId);
for (Map.Entry<String, String> entry : idToClassIdMap.entrySet()) {
if (entry.getValue().equals(classId)) {
idToSubClassMap.put(entry.getKey(), subClassId);
}
}
} else {
LOGGER.error("Invalid @id in rdfs:subClassOf for class ID: {}", classId);
}
Expand All @@ -231,7 +246,7 @@ private void handleDataModelResponse(
}

if (pendingRequests.decrementAndGet() == 0) {
promise.complete(classIdToSubClassMap);
promise.complete(idToSubClassMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ void updateDXResourceItem(){
)
)
)
.put("resourceServerRegURL", "rs.iudx.io")
.put("resourceServerRegURL", "rs-test-pm.iudx.io")
.put("resourceAccessModalities", new JsonArray()
.add(new JsonObject()
.put("type", new JsonArray().add("iudx:HTTPAccess"))
Expand Down
Loading

0 comments on commit 892e378

Please sign in to comment.