From ea3356b3fc176a97775f2f68e69792f2fe2c7ff9 Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Thu, 28 Nov 2024 15:33:31 +0000 Subject: [PATCH 1/7] genericize the deferred resolver --- app/models/entities/Loci.scala | 11 ++- app/models/gql/DeferredResolvers.scala | 128 ++++++++++++++++--------- 2 files changed, 88 insertions(+), 51 deletions(-) diff --git a/app/models/entities/Loci.scala b/app/models/entities/Loci.scala index 81c98d1a..58337496 100644 --- a/app/models/entities/Loci.scala +++ b/app/models/entities/Loci.scala @@ -9,6 +9,7 @@ import play.api.libs.json._ import play.api.libs.json.{Reads, JsValue, Json, OFormat, OWrites} import play.api.libs.functional.syntax._ import sangria.schema.{ + DeferredValue, Field, FloatType, IntType, @@ -17,9 +18,9 @@ import sangria.schema.{ OptionType, StringType, fields, - DeferredValue } import models.gql.Arguments.{studyTypes, pageArg, pageSize, variantIds} +import models.gql.TypeWithId case class Locus( variantId: Option[String], @@ -37,8 +38,8 @@ case class Locus( case class Loci( count: Long, rows: Option[Seq[Locus]], - studyLocusId: String -) + id: String +) extends TypeWithId object Loci extends Logging { import sangria.macros.derive._ @@ -60,7 +61,9 @@ object Loci extends Logging { ) ) - implicit val lociImp: ObjectType[Backend, Loci] = deriveObjectType[Backend, Loci]() + implicit val lociImp: ObjectType[Backend, Loci] = deriveObjectType[Backend, Loci]( + ExcludeFields("id"), + ) implicit val locusF: OFormat[Locus] = Json.format[Locus] implicit val lociR: Reads[Loci] = ( (JsPath \ "count").read[Long] and diff --git a/app/models/gql/DeferredResolvers.scala b/app/models/gql/DeferredResolvers.scala index 449017dc..a17e55f6 100644 --- a/app/models/gql/DeferredResolvers.scala +++ b/app/models/gql/DeferredResolvers.scala @@ -4,57 +4,91 @@ import models.{Backend, entities} import play.api.Logging import sangria.execution.deferred.{Deferred, DeferredResolver} import scala.concurrent._ +import cats.syntax.group -case class LocusDeferred(studyLocusId: String, - variantIds: Option[Seq[String]], - pagination: Option[Pagination] -) extends Deferred[Loci] - -// TODO: Genericize this resolver to handle not just locus but other deferred types: hint - use the groupDeferred method -class LocusResolver extends DeferredResolver[Backend] with Logging { - def resolve(deferred: Vector[Deferred[Any]], ctx: Backend, queryState: Any)(implicit - ec: ExecutionContext - ): Vector[Future[Any]] = { - val lq = deferred collect { case q: LocusDeferred => q } - // group by variantIds and pagination so that we can use studyLocusId to fetch the loci - val groupedLq = lq.groupBy(q => (q.variantIds, q.pagination)) - val locusQueries = groupedLq.map { case ((variantIds, pagination), queries) => - val studyLocusIds = queries.map(_.studyLocusId) - (studyLocusIds, variantIds, pagination) +trait TypeWithId { + val id: String +} + +case class GroupedResults[T](grouping: Product, results: Future[IndexedSeq[T]]) + +abstract class DeferredMultiTerm[+T]() extends Deferred[T] { + val id: String + val grouping: Product + def empty(): T + def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[T]] +} + +case class LocusDeferred(studyLocusId: String, variantIds: Option[Seq[String]], pagination: Option[Pagination]) extends DeferredMultiTerm[Loci] { + val id: String = studyLocusId + val grouping = (variantIds, pagination) + def empty(): Loci = Loci(0, None, "") + def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[Loci]] = { + case (s: Seq[String], options: Product) => { + options match { + case (v, p) => + ctx.getLocus(s, v.asInstanceOf[Option[Seq[String]]], p.asInstanceOf[Option[Pagination]]) + } + } + } +} + + +/** A deferred resolver for cases where we can't use the Fetch API because we resolve the + * values on multiple terms/filters. + **/ +class MultiTermResolver extends DeferredResolver[Backend] with Logging { + def groupResults[T](deferred: Vector[DeferredMultiTerm[T]], ctx: Backend): Map[Product, Future[IndexedSeq[T]]] = { + val grouped = deferred.groupBy(q => q.grouping) + val queries = grouped.map { + case (grouping, queries) => + val ids = queries.map(_.id) + val resolver = queries.head.resolver(ctx) + (ids, grouping, resolver) + } + val results = queries.map { + case (ids, grouping, resolver) => + val r = resolver(ids, grouping) + grouping -> r + }.toMap + results } - // results grouped by variantIds and pagination - val results = locusQueries.map { case (studyLocusIds, variantIds, pagination) => - val r = ctx.getLocus(studyLocusIds, variantIds, pagination) - (variantIds, pagination) -> r - }.toMap - deferred.map { case LocusDeferred(studyLocusId, variantIds, pagination) => - // lookup results based on the variantIds pagination group in the results - val group = results.get((variantIds, pagination)).get - val l = group.map(loci => loci.filter(studyLocusId == _.studyLocusId)) - l.map(_.headOption.getOrElse(Loci.empty())) + + def getResultForId[T](deferredQ: DeferredMultiTerm[T], results: Map[Product, Future[IndexedSeq[TypeWithId]]])(implicit ec: ExecutionContext): Future[T] = { + val group = results.get(deferredQ.grouping).get + val hit = group.map(_.filter(_.id == deferredQ.id)) + hit.map(_.headOption.getOrElse(deferredQ.empty()).asInstanceOf[T]) + } + + def resolve(deferred: Vector[Deferred[Any]], ctx: Backend, queryState: Any)(implicit ec: ExecutionContext): Vector[Future[Any]] = { + val lq = deferred collect {case q: LocusDeferred => q} + val results = groupResults(lq, ctx) + deferred.map { + case q: LocusDeferred => + getResultForId(q, results) + } } - } } object DeferredResolvers extends Logging { - val locusResolver = new LocusResolver() - // add fetchers and locusResolver to the resolvers - val deferredResolvers: DeferredResolver[Backend] = DeferredResolver.fetchersWithFallback( - locusResolver, - Fetchers.biosamplesFetcher, - Fetchers.credibleSetFetcher, - Fetchers.l2gFetcher, - Fetchers.targetsFetcher, - Fetchers.drugsFetcher, - Fetchers.diseasesFetcher, - Fetchers.hposFetcher, - Fetchers.reactomeFetcher, - Fetchers.expressionFetcher, - Fetchers.otarProjectsFetcher, - Fetchers.soTermsFetcher, - Fetchers.indicationFetcher, - Fetchers.goFetcher, - Fetchers.variantFetcher, - Fetchers.studyFetcher - ) + val multiTermResolver = new MultiTermResolver() + // add fetchers and locusResolver to the resolvers + val deferredResolvers: DeferredResolver[Backend] = DeferredResolver.fetchersWithFallback( + multiTermResolver, + Fetchers.biosamplesFetcher, + Fetchers.credibleSetFetcher, + Fetchers.l2gFetcher, + Fetchers.targetsFetcher, + Fetchers.drugsFetcher, + Fetchers.diseasesFetcher, + Fetchers.hposFetcher, + Fetchers.reactomeFetcher, + Fetchers.expressionFetcher, + Fetchers.otarProjectsFetcher, + Fetchers.soTermsFetcher, + Fetchers.indicationFetcher, + Fetchers.goFetcher, + Fetchers.variantFetcher, + Fetchers.gwasFetcher + ) } From 239fb1103713b8ba745d82402ab0a295cc6f2224 Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Thu, 28 Nov 2024 21:58:32 +0000 Subject: [PATCH 2/7] formatting --- app/models/entities/Loci.scala | 6 +- app/models/gql/DeferredResolvers.scala | 136 +++++++++++++------------ 2 files changed, 73 insertions(+), 69 deletions(-) diff --git a/app/models/entities/Loci.scala b/app/models/entities/Loci.scala index 58337496..e3fdde7c 100644 --- a/app/models/entities/Loci.scala +++ b/app/models/entities/Loci.scala @@ -17,7 +17,7 @@ import sangria.schema.{ ObjectType, OptionType, StringType, - fields, + fields } import models.gql.Arguments.{studyTypes, pageArg, pageSize, variantIds} import models.gql.TypeWithId @@ -39,7 +39,7 @@ case class Loci( count: Long, rows: Option[Seq[Locus]], id: String -) extends TypeWithId +) extends TypeWithId object Loci extends Logging { import sangria.macros.derive._ @@ -62,7 +62,7 @@ object Loci extends Logging { ) implicit val lociImp: ObjectType[Backend, Loci] = deriveObjectType[Backend, Loci]( - ExcludeFields("id"), + ExcludeFields("id") ) implicit val locusF: OFormat[Locus] = Json.format[Locus] implicit val lociR: Reads[Loci] = ( diff --git a/app/models/gql/DeferredResolvers.scala b/app/models/gql/DeferredResolvers.scala index a17e55f6..7fc96632 100644 --- a/app/models/gql/DeferredResolvers.scala +++ b/app/models/gql/DeferredResolvers.scala @@ -7,88 +7,92 @@ import scala.concurrent._ import cats.syntax.group trait TypeWithId { - val id: String + val id: String } case class GroupedResults[T](grouping: Product, results: Future[IndexedSeq[T]]) abstract class DeferredMultiTerm[+T]() extends Deferred[T] { - val id: String - val grouping: Product - def empty(): T - def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[T]] + val id: String + val grouping: Product + def empty(): T + def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[T]] } -case class LocusDeferred(studyLocusId: String, variantIds: Option[Seq[String]], pagination: Option[Pagination]) extends DeferredMultiTerm[Loci] { - val id: String = studyLocusId - val grouping = (variantIds, pagination) - def empty(): Loci = Loci(0, None, "") - def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[Loci]] = { - case (s: Seq[String], options: Product) => { - options match { - case (v, p) => - ctx.getLocus(s, v.asInstanceOf[Option[Seq[String]]], p.asInstanceOf[Option[Pagination]]) - } - } - } +case class LocusDeferred(studyLocusId: String, + variantIds: Option[Seq[String]], + pagination: Option[Pagination] +) extends DeferredMultiTerm[Loci] { + val id: String = studyLocusId + val grouping = (variantIds, pagination) + def empty(): Loci = Loci(0, None, "") + def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[Loci]] = { + case (s: Seq[String], options: Product) => + options match { + case (v, p) => + ctx.getLocus(s, v.asInstanceOf[Option[Seq[String]]], p.asInstanceOf[Option[Pagination]]) + } + } } - /** A deferred resolver for cases where we can't use the Fetch API because we resolve the - * values on multiple terms/filters. - **/ + * values on multiple terms/filters. + */ class MultiTermResolver extends DeferredResolver[Backend] with Logging { - def groupResults[T](deferred: Vector[DeferredMultiTerm[T]], ctx: Backend): Map[Product, Future[IndexedSeq[T]]] = { - val grouped = deferred.groupBy(q => q.grouping) - val queries = grouped.map { - case (grouping, queries) => - val ids = queries.map(_.id) - val resolver = queries.head.resolver(ctx) - (ids, grouping, resolver) - } - val results = queries.map { - case (ids, grouping, resolver) => - val r = resolver(ids, grouping) - grouping -> r - }.toMap - results + def groupResults[T](deferred: Vector[DeferredMultiTerm[T]], + ctx: Backend + ): Map[Product, Future[IndexedSeq[T]]] = { + val grouped = deferred.groupBy(q => q.grouping) + val queries = grouped.map { case (grouping, queries) => + val ids = queries.map(_.id) + val resolver = queries.head.resolver(ctx) + (ids, grouping, resolver) } + val results = queries.map { case (ids, grouping, resolver) => + val r = resolver(ids, grouping) + grouping -> r + }.toMap + results + } - def getResultForId[T](deferredQ: DeferredMultiTerm[T], results: Map[Product, Future[IndexedSeq[TypeWithId]]])(implicit ec: ExecutionContext): Future[T] = { - val group = results.get(deferredQ.grouping).get - val hit = group.map(_.filter(_.id == deferredQ.id)) - hit.map(_.headOption.getOrElse(deferredQ.empty()).asInstanceOf[T]) - } + def getResultForId[T](deferredQ: DeferredMultiTerm[T], + results: Map[Product, Future[IndexedSeq[TypeWithId]]] + )(implicit ec: ExecutionContext): Future[T] = { + val group = results.get(deferredQ.grouping).get + val hit = group.map(_.filter(_.id == deferredQ.id)) + hit.map(_.headOption.getOrElse(deferredQ.empty()).asInstanceOf[T]) + } - def resolve(deferred: Vector[Deferred[Any]], ctx: Backend, queryState: Any)(implicit ec: ExecutionContext): Vector[Future[Any]] = { - val lq = deferred collect {case q: LocusDeferred => q} - val results = groupResults(lq, ctx) - deferred.map { - case q: LocusDeferred => - getResultForId(q, results) - } + def resolve(deferred: Vector[Deferred[Any]], ctx: Backend, queryState: Any)(implicit + ec: ExecutionContext + ): Vector[Future[Any]] = { + val lq = deferred collect { case q: LocusDeferred => q } + val results = groupResults(lq, ctx) + deferred.map { case q: LocusDeferred => + getResultForId(q, results) } + } } object DeferredResolvers extends Logging { - val multiTermResolver = new MultiTermResolver() - // add fetchers and locusResolver to the resolvers - val deferredResolvers: DeferredResolver[Backend] = DeferredResolver.fetchersWithFallback( - multiTermResolver, - Fetchers.biosamplesFetcher, - Fetchers.credibleSetFetcher, - Fetchers.l2gFetcher, - Fetchers.targetsFetcher, - Fetchers.drugsFetcher, - Fetchers.diseasesFetcher, - Fetchers.hposFetcher, - Fetchers.reactomeFetcher, - Fetchers.expressionFetcher, - Fetchers.otarProjectsFetcher, - Fetchers.soTermsFetcher, - Fetchers.indicationFetcher, - Fetchers.goFetcher, - Fetchers.variantFetcher, - Fetchers.gwasFetcher - ) + val multiTermResolver = new MultiTermResolver() + // add fetchers and locusResolver to the resolvers + val deferredResolvers: DeferredResolver[Backend] = DeferredResolver.fetchersWithFallback( + multiTermResolver, + Fetchers.biosamplesFetcher, + Fetchers.credibleSetFetcher, + Fetchers.l2gFetcher, + Fetchers.targetsFetcher, + Fetchers.drugsFetcher, + Fetchers.diseasesFetcher, + Fetchers.hposFetcher, + Fetchers.reactomeFetcher, + Fetchers.expressionFetcher, + Fetchers.otarProjectsFetcher, + Fetchers.soTermsFetcher, + Fetchers.indicationFetcher, + Fetchers.goFetcher, + Fetchers.variantFetcher, + Fetchers.gwasFetcher + ) } From 7a1232afab7c18252b9810133b27aa85214541b9 Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Fri, 29 Nov 2024 15:02:28 +0000 Subject: [PATCH 3/7] deferred resolver for credible sets by study using multi search api --- app/models/Backend.scala | 48 ++++++++--- app/models/ElasticRetriever.scala | 82 +++++++++++++++++++ .../ElasticRetrieverQueryBuilders.scala | 22 ++++- app/models/entities/CredibleSets.scala | 7 +- app/models/entities/Study.scala | 7 +- app/models/gql/DeferredResolvers.scala | 46 ++++++++--- app/models/gql/Objects.scala | 2 +- 7 files changed, 184 insertions(+), 30 deletions(-) diff --git a/app/models/Backend.scala b/app/models/Backend.scala index 79fe497e..42f00888 100644 --- a/app/models/Backend.scala +++ b/app/models/Backend.scala @@ -288,15 +288,14 @@ class Backend @Inject() (implicit .getInnerQ( indexName, query, - Pagination.mkDefault, + Pagination.mkMax, fromJsValue[Locus], "locus", Some("studyLocusId") ) - retriever.map { case (locus, _, counts, studyLocusIds) => - // zip the credsets with the counts and studyLocusIds and locusQueries and return an indexed seq of loci - locus.zip(counts).zip(studyLocusIds).map { case ((locus, count), studyLocusId) => - Loci(count, Some(locus), studyLocusId.as[String]) + retriever.map { + case (locus, _, counts, studyLocusIds) => locus.zip(counts).zip(studyLocusIds).map { + case ((locus, count), studyLocusId) => Loci(count, Some(locus), studyLocusId.as[String]) } } } @@ -349,7 +348,6 @@ class Backend @Inject() (implicit must(termsQueryIter) } } - logger.info(s"Querying credible sets for: $query") val retriever = esRetriever .getQ( @@ -366,6 +364,34 @@ class Backend @Inject() (implicit } } + def getCredibleSetsByStudy(studyIds: Seq[String], pagination: Option[Pagination]): Future[IndexedSeq[CredibleSets]] = { + val pag = pagination.getOrElse(Pagination.mkDefault) + val indexName = getIndexOrDefault("credible_setn") + val queries = studyIds.map { studyId => IndexQuery( + esIndex = indexName, + kv = Map("studyId.keyword" -> Seq(studyId)), + filters = Seq.empty, + pagination = pag, + aggs = Seq.empty, + excludedFields = Seq("locus", "ldSet") + ) + } + val retriever = + esRetriever + .getMultiByIndexedTermsMust( + queries, + fromJsValue[JsValue], + None, + Some("studyId") + ) + retriever.map { + case r => r.map { + case (Seq(), _, _, _) => CredibleSets.empty + case (credsets, _, counts, studyId) => CredibleSets(counts, credsets, studyId.as[String]) + } + } + } + def getTargetEssentiality(ids: Seq[String]): Future[IndexedSeq[TargetEssentiality]] = { val targetIndexName = getIndexOrDefault("target_essentiality") @@ -859,14 +885,14 @@ class Backend @Inject() (implicit size: Int ): Future[Vector[Similarity]] = { val table = defaultOTSettings.clickhouse.similarities - logger.info(s"query similarities in table ${table.name}") + logger.debug(s"query similarities in table ${table.name}") val jointLabels = labels + label val simQ = QW2V(table.name, categories, jointLabels, threshold, size) dbRetriever.executeQuery[Long, Query](simQ.existsLabel(label)).flatMap { case Vector(1) => dbRetriever.executeQuery[Similarity, Query](simQ.query) case _ => - logger.info( + logger.debug( s"This case where the label asked ${label} to the model Word2Vec does not exist" + s" is ok but nice to capture though" ) @@ -911,7 +937,7 @@ class Backend @Inject() (implicit ): Future[Publications] = { val table = defaultOTSettings.clickhouse.literature val indexTable = defaultOTSettings.clickhouse.literatureIndex - logger.info(s"query literature ocurrences in table ${table.name}") + logger.debug(s"query literature ocurrences in table ${table.name}") val pag = Helpers.Cursor.to(cursor).flatMap(_.asOpt[Pagination]).getOrElse(Pagination.mkDefault) @@ -948,12 +974,12 @@ class Backend @Inject() (implicit case Vector(year) => runQuery(year, total) case _ => - logger.info(s"Cannot find the earliest year for the publications.") + logger.debug(s"Cannot find the earliest year for the publications.") runQuery(1900, total) } case _ => - logger.info(s"there is no publications with this set of ids $ids") + logger.debug(s"there is no publications with this set of ids $ids") Future.successful(Publications.empty()) } } diff --git a/app/models/ElasticRetriever.scala b/app/models/ElasticRetriever.scala index 8635d4a4..4b8fbbc4 100644 --- a/app/models/ElasticRetriever.scala +++ b/app/models/ElasticRetriever.scala @@ -249,6 +249,20 @@ class ElasticRetriever @Inject() ( } } + private def getMultiByIndexedQuery[A]( + searchRequest: MultiSearchRequest, + sortByField: Option[sort.FieldSort] = None, + buildF: JsValue => Option[A], + resolverField: Option[String] + ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { + // log and execute the query + val searchResponse: Future[Response[MultiSearchResponse]] = executeMultiQuery(searchRequest, sortByField) + // convert results into A + searchResponse.map { + handleMultiSearchResponse(_, searchRequest, buildF, resolverField) + } + } + private def executeQuery( searchRequest: SearchRequest, sortByField: Option[sort.FieldSort] @@ -263,6 +277,23 @@ class ElasticRetriever @Inject() ( sortedSearchRequest } + private def executeMultiQuery( + searchRequest: MultiSearchRequest, + sortByField: Option[sort.FieldSort] + ): Future[Response[MultiSearchResponse]] = + client.execute { + val sortedSearchRequest = sortByField match { + case Some(s) => + val sortedSearches = searchRequest.searches.map { search => + search.sortBy(s) + } + searchRequest.copy(searches = sortedSearches) + case None => searchRequest + } + logger.info(s"Elasticsearch query: ${client.show(sortedSearchRequest)}") + sortedSearchRequest + } + private def handleSearchResponse[A]( searchResponse: Response[SearchResponse], searchQuery: SearchRequest, @@ -292,6 +323,47 @@ class ElasticRetriever @Inject() ( (mappedHits, aggs, total) } + private def handleMultiSearchResponse[A]( + searchResponse: Response[MultiSearchResponse], + searchQuery: MultiSearchRequest, + buildF: JsValue => Option[A], + resolverField: Option[String] + ): IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)] = { + searchResponse match { + case rf: RequestFailure => + logger.debug(s"Request failure for query: $searchQuery") + logger.error(s"Elasticsearch error: ${rf.error}") + IndexedSeq.empty + case results: RequestSuccess[MultiSearchResponse] => + val result = Json.parse(results.body.get) + logger.trace(Json.prettyPrint(result)) + val responses = (result \ "responses").get.as[JsArray].value + responses.map { response => + val hits = (response \ "hits" \ "hits").get.as[JsArray].value + val aggs = (response \ "aggregations").getOrElse(JsNull) + val total = (response \ "hits" \ "total" \ "value").as[Long] + val rf = resolverField match { + case Some(r) => + hits + .map { jObj => + (jObj \ "_source" \ r).as[JsValue] + } + .headOption.getOrElse(JsNull) + case None => JsNull + } + val mappedHits = hits + .map { jObj => + buildF(jObj) + } + .withFilter(_.isDefined) + .map(_.get) + .to(IndexedSeq) + (mappedHits, aggs, total, rf) + }.toIndexedSeq + } + } + + private def handleInnerSearchResponse[A]( searchResponse: Response[SearchResponse], searchQuery: SearchRequest, @@ -339,6 +411,16 @@ class ElasticRetriever @Inject() ( (mappedHits, aggs, total, parentfields) } + def getMultiByIndexedTermsMust[V, A]( + indexQueries: Seq[IndexQuery[V]], + buildF: JsValue => Option[A], + sortByField: Option[sort.FieldSort] = None, + resolverField: Option[String], + ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { + val searchRequest: MultiSearchRequest = MultiIndexTermsMust(indexQueries) + getMultiByIndexedQuery(searchRequest, sortByField, buildF, resolverField) + } + /* Provide a specific Bool Query*/ def getQ[A]( esIndex: String, diff --git a/app/models/ElasticRetrieverQueryBuilders.scala b/app/models/ElasticRetrieverQueryBuilders.scala index 29f526da..924f7082 100644 --- a/app/models/ElasticRetrieverQueryBuilders.scala +++ b/app/models/ElasticRetrieverQueryBuilders.scala @@ -1,10 +1,10 @@ package models -import com.sksamuel.elastic4s.ElasticDsl.search +import com.sksamuel.elastic4s.ElasticDsl.{search, multi} import com.sksamuel.elastic4s.api.QueryApi import com.sksamuel.elastic4s.requests.searches.aggs.AbstractAggregation import com.sksamuel.elastic4s.requests.searches.queries.Query -import com.sksamuel.elastic4s.requests.searches.SearchRequest +import com.sksamuel.elastic4s.requests.searches.{SearchRequest, MultiSearchRequest} import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery import models.entities.Pagination import play.api.Logging @@ -47,6 +47,11 @@ trait ElasticRetrieverQueryBuilders extends QueryApi with Logging { ): SearchRequest = getByIndexTermsBuilder(indexQuery, should) + def MultiIndexTermsMust[V]( + indexQueries: Seq[IndexQuery[V]] + ): MultiSearchRequest = + multiSearchTermsBuilder(indexQueries, must) + def getByIndexQueryBuilder[V]( indexQuery: IndexQuery[V], f: Iterable[Query] => BoolQuery @@ -93,4 +98,15 @@ trait ElasticRetrieverQueryBuilders extends QueryApi with Logging { .trackTotalHits(true) .sourceExclude(indexQuery.excludedFields) } -} + + def multiSearchTermsBuilder[V]( + indexQueries: Seq[IndexQuery[V]], + f: Iterable[Query] => BoolQuery + ): MultiSearchRequest = { + val searchRequests = indexQueries.map { query => { + getByIndexTermsBuilder(query, f) + } + } + multi(searchRequests) + } +} \ No newline at end of file diff --git a/app/models/entities/CredibleSets.scala b/app/models/entities/CredibleSets.scala index 649ad43f..e244537a 100644 --- a/app/models/entities/CredibleSets.scala +++ b/app/models/entities/CredibleSets.scala @@ -4,11 +4,14 @@ import models.Backend import models.entities.CredibleSet.credibleSetImp import play.api.libs.json.JsValue import sangria.schema.{ObjectType, Field, ListType, LongType, fields} +import models.gql.TypeWithId + case class CredibleSets( count: Long, - rows: IndexedSeq[JsValue] -) + rows: IndexedSeq[JsValue], + id: String = "" +) extends TypeWithId object CredibleSets { def empty: CredibleSets = CredibleSets(0, IndexedSeq.empty) diff --git a/app/models/entities/Study.scala b/app/models/entities/Study.scala index f186b030..c358d86b 100644 --- a/app/models/entities/Study.scala +++ b/app/models/entities/Study.scala @@ -20,6 +20,7 @@ import sangria.schema.{ DeferredValue } import models.gql.StudyTypeEnum +import models.gql.CredibleSetsByStudyDeferred import models.gql.Arguments.{pageArg, StudyType} import play.api.libs.json._ import play.api.libs.functional.syntax.toFunctionalBuilderOps @@ -241,8 +242,10 @@ object Study extends Logging { description = Some("Credible sets"), resolve = js => { val studyId = (js.value \ "studyId").as[String] - val credSetQueryArgs = CredibleSetQueryArgs(studyIds = Seq(studyId)) - js.ctx.getCredibleSets(credSetQueryArgs, js.arg(pageArg)) + //val credSetQueryArgs = CredibleSetQueryArgs(studyIds = Seq(studyId)) + //js.ctx.getCredibleSets(credSetQueryArgs, js.arg(pageArg)) + CredibleSetsByStudyDeferred(studyId, js.arg(pageArg)) + } ) lazy val studyImp: ObjectType[Backend, JsValue] = ObjectType( diff --git a/app/models/gql/DeferredResolvers.scala b/app/models/gql/DeferredResolvers.scala index 7fc96632..54d4dbb8 100644 --- a/app/models/gql/DeferredResolvers.scala +++ b/app/models/gql/DeferredResolvers.scala @@ -1,20 +1,25 @@ package models.gql -import models.entities.{Loci, Pagination} +import models.entities.{Loci, Pagination, CredibleSets, CredibleSetQueryArgs} import models.{Backend, entities} import play.api.Logging import sangria.execution.deferred.{Deferred, DeferredResolver} import scala.concurrent._ -import cats.syntax.group +import models.gql.Arguments.studyId +import models.gql.Objects.locationAndSourceImp + trait TypeWithId { val id: String } -case class GroupedResults[T](grouping: Product, results: Future[IndexedSeq[T]]) +/** @param id The ID to resolve on + * @param grouping A tuple of the values that are used to group the deferred values + * @tparam T The type of the deferred value + */ abstract class DeferredMultiTerm[+T]() extends Deferred[T] { val id: String - val grouping: Product + val grouping: Product def empty(): T def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[T]] } @@ -22,10 +27,10 @@ abstract class DeferredMultiTerm[+T]() extends Deferred[T] { case class LocusDeferred(studyLocusId: String, variantIds: Option[Seq[String]], pagination: Option[Pagination] -) extends DeferredMultiTerm[Loci] { + ) extends DeferredMultiTerm[Loci] { val id: String = studyLocusId val grouping = (variantIds, pagination) - def empty(): Loci = Loci(0, None, "") + def empty(): Loci = Loci.empty() def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[Loci]] = { case (s: Seq[String], options: Product) => options match { @@ -35,6 +40,21 @@ case class LocusDeferred(studyLocusId: String, } } +case class CredibleSetsByStudyDeferred(studyId: String, pagination: Option[Pagination]) extends DeferredMultiTerm[CredibleSets] { + val id: String = studyId + val grouping = (pagination) + def empty(): CredibleSets = CredibleSets.empty + def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[CredibleSets]] = { + case (s: Seq[String], options: Product) => + options match { + case (p) => { + ctx.getCredibleSetsByStudy(s, p.asInstanceOf[Option[Pagination]]) + } + } + } +} + + /** A deferred resolver for cases where we can't use the Fetch API because we resolve the * values on multiple terms/filters. */ @@ -66,10 +86,14 @@ class MultiTermResolver extends DeferredResolver[Backend] with Logging { def resolve(deferred: Vector[Deferred[Any]], ctx: Backend, queryState: Any)(implicit ec: ExecutionContext ): Vector[Future[Any]] = { - val lq = deferred collect { case q: LocusDeferred => q } - val results = groupResults(lq, ctx) - deferred.map { case q: LocusDeferred => - getResultForId(q, results) + val deferredByType = deferred collect { + case locus: LocusDeferred => locus + case credSetByStudy: CredibleSetsByStudyDeferred => credSetByStudy + } + val results = groupResults(deferredByType, ctx) + deferred.map { + case locus: LocusDeferred => getResultForId(locus, results) + case credSetByStudy: CredibleSetsByStudyDeferred => getResultForId(credSetByStudy, results) } } } @@ -93,6 +117,6 @@ object DeferredResolvers extends Logging { Fetchers.indicationFetcher, Fetchers.goFetcher, Fetchers.variantFetcher, - Fetchers.gwasFetcher + Fetchers.studyFetcher ) } diff --git a/app/models/gql/Objects.scala b/app/models/gql/Objects.scala index 1cec3e8f..27943d93 100644 --- a/app/models/gql/Objects.scala +++ b/app/models/gql/Objects.scala @@ -1350,7 +1350,7 @@ object Objects extends Logging { Some("Credible set"), resolve = r => { val studyLocusId = r.value.otherStudyLocusId.getOrElse("") - logger.info(s"Finding colocalisation credible set: $studyLocusId") + logger.debug(s"Finding colocalisation credible set: $studyLocusId") // r.ctx.getCredibleSet(studyLocusId) credibleSetFetcher.deferOpt(studyLocusId) } From 0461bc962d0ada23e85f515c4e2265259dca0ed6 Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Sat, 30 Nov 2024 23:20:22 +0000 Subject: [PATCH 4/7] deferred resolver for credset by variant --- app/models/Backend.scala | 74 +++++++++++++++---- app/models/ElasticRetriever.scala | 63 ++++++++++------ .../ElasticRetrieverQueryBuilders.scala | 37 +++++++++- app/models/entities/CredibleSets.scala | 1 - app/models/gql/DeferredResolvers.scala | 47 ++++++++---- app/models/gql/Objects.scala | 7 +- 6 files changed, 171 insertions(+), 58 deletions(-) diff --git a/app/models/Backend.scala b/app/models/Backend.scala index 42f00888..d9abb2a5 100644 --- a/app/models/Backend.scala +++ b/app/models/Backend.scala @@ -293,9 +293,9 @@ class Backend @Inject() (implicit "locus", Some("studyLocusId") ) - retriever.map { - case (locus, _, counts, studyLocusIds) => locus.zip(counts).zip(studyLocusIds).map { - case ((locus, count), studyLocusId) => Loci(count, Some(locus), studyLocusId.as[String]) + retriever.map { case (locus, _, counts, studyLocusIds) => + locus.zip(counts).zip(studyLocusIds).map { case ((locus, count), studyLocusId) => + Loci(count, Some(locus), studyLocusId.as[String]) } } } @@ -364,32 +364,80 @@ class Backend @Inject() (implicit } } - def getCredibleSetsByStudy(studyIds: Seq[String], pagination: Option[Pagination]): Future[IndexedSeq[CredibleSets]] = { + def getCredibleSetsByStudy(studyIds: Seq[String], + pagination: Option[Pagination] + ): Future[IndexedSeq[CredibleSets]] = { val pag = pagination.getOrElse(Pagination.mkDefault) - val indexName = getIndexOrDefault("credible_setn") - val queries = studyIds.map { studyId => IndexQuery( - esIndex = indexName, + val indexName = getIndexOrDefault("credible_set") + val queries = studyIds.map { studyId => + IndexQuery( + esIndex = indexName, kv = Map("studyId.keyword" -> Seq(studyId)), filters = Seq.empty, pagination = pag, aggs = Seq.empty, excludedFields = Seq("locus", "ldSet") - ) - } + ) + } val retriever = esRetriever .getMultiByIndexedTermsMust( queries, fromJsValue[JsValue], None, - Some("studyId") + Some(ResolverField("studyId")) ) - retriever.map { - case r => r.map { - case (Seq(), _, _, _) => CredibleSets.empty + retriever.map { case r => + r.map { + case (Seq(), _, _, _) => CredibleSets.empty case (credsets, _, counts, studyId) => CredibleSets(counts, credsets, studyId.as[String]) + } + } + } + + def getCredibleSetsByVariant(variantIds: Seq[String], + studyTypes: Option[Seq[StudyTypeEnum.Value]], + pagination: Option[Pagination] + ): Future[IndexedSeq[CredibleSets]] = { + val pag = pagination.getOrElse(Pagination.mkDefault) + val indexName = getIndexOrDefault("credible_set") + val termsQueryIter: Option[Iterable[queries.Query]] = studyTypes match { + case Some(studyTypes) => Some(Iterable(must(termQuery("studyType.keyword", studyTypes)))) + case None => None + } + // nested query for each variant id in variantIds + val boolQueries: Seq[IndexBoolQuery] = variantIds.map { variantId => + val query: BoolQuery = { + val nestedTermsQuery = termQuery("locus.variantId.keyword", variantId) + val nestedQueryIter = + Iterable(nestedQuery("locus", must(nestedTermsQuery)).inner(innerHits("locus").size(1))) + termsQueryIter match { + case None => must(nestedQueryIter) + case Some(termsQueryIter) => must(termsQueryIter ++ nestedQueryIter) } + }.queryName(variantId) + IndexBoolQuery( + esIndex = indexName, + boolQuery = query, + pagination = pag, + excludedFields = Seq("ldSet", "locus") + ) + } + val retriever = + esRetriever + .getMultiQ( + boolQueries, + fromJsValue[JsValue], + None, + Some(ResolverField(matched_queries = true)) + ) + retriever.map { case r => + r.map { + case (Seq(), _, _, _) => CredibleSets.empty + case (credsets, _, counts, variantId) => + CredibleSets(counts, credsets, variantId.as[String]) } + } } def getTargetEssentiality(ids: Seq[String]): Future[IndexedSeq[TargetEssentiality]] = { diff --git a/app/models/ElasticRetriever.scala b/app/models/ElasticRetriever.scala index 4b8fbbc4..cb6c1851 100644 --- a/app/models/ElasticRetriever.scala +++ b/app/models/ElasticRetriever.scala @@ -28,6 +28,12 @@ import com.sksamuel.elastic4s.requests.searches.term.TermQuery import com.sksamuel.elastic4s.handlers.index.Search import views.html.index.f +case class ResolverField(fieldname: Option[String], matched_queries: Boolean = false) +object ResolverField { + def apply(fieldname: String): ResolverField = ResolverField(Some(fieldname)) + def apply(matched_queries: Boolean): ResolverField = ResolverField(None, matched_queries) +} + class ElasticRetriever @Inject() ( client: ElasticClient, hlFields: Seq[String], @@ -253,10 +259,11 @@ class ElasticRetriever @Inject() ( searchRequest: MultiSearchRequest, sortByField: Option[sort.FieldSort] = None, buildF: JsValue => Option[A], - resolverField: Option[String] + resolverField: Option[ResolverField] ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { // log and execute the query - val searchResponse: Future[Response[MultiSearchResponse]] = executeMultiQuery(searchRequest, sortByField) + val searchResponse: Future[Response[MultiSearchResponse]] = + executeMultiQuery(searchRequest, sortByField) // convert results into A searchResponse.map { handleMultiSearchResponse(_, searchRequest, buildF, resolverField) @@ -327,8 +334,8 @@ class ElasticRetriever @Inject() ( searchResponse: Response[MultiSearchResponse], searchQuery: MultiSearchRequest, buildF: JsValue => Option[A], - resolverField: Option[String] - ): IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)] = { + resolverField: Option[ResolverField] + ): IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)] = searchResponse match { case rf: RequestFailure => logger.debug(s"Request failure for query: $searchQuery") @@ -343,13 +350,21 @@ class ElasticRetriever @Inject() ( val aggs = (response \ "aggregations").getOrElse(JsNull) val total = (response \ "hits" \ "total" \ "value").as[Long] val rf = resolverField match { - case Some(r) => + case Some(ResolverField(Some(r), false)) => hits .map { jObj => (jObj \ "_source" \ r).as[JsValue] } - .headOption.getOrElse(JsNull) - case None => JsNull + .headOption + .getOrElse(JsNull) + case Some(ResolverField(None, true)) => + hits + .map { jObj => + (jObj \ "matched_queries").as[JsArray].value.headOption.getOrElse(JsNull) + } + .headOption + .getOrElse(JsNull) + case _ => JsNull } val mappedHits = hits .map { jObj => @@ -361,9 +376,7 @@ class ElasticRetriever @Inject() ( (mappedHits, aggs, total, rf) }.toIndexedSeq } - } - - + private def handleInnerSearchResponse[A]( searchResponse: Response[SearchResponse], searchQuery: SearchRequest, @@ -415,12 +428,22 @@ class ElasticRetriever @Inject() ( indexQueries: Seq[IndexQuery[V]], buildF: JsValue => Option[A], sortByField: Option[sort.FieldSort] = None, - resolverField: Option[String], + resolverField: Option[ResolverField] ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { val searchRequest: MultiSearchRequest = MultiIndexTermsMust(indexQueries) getMultiByIndexedQuery(searchRequest, sortByField, buildF, resolverField) } - + + def getMultiQ[A]( + indexQueries: Seq[IndexBoolQuery], + buildF: JsValue => Option[A], + sortByField: Option[sort.FieldSort] = None, + resolverField: Option[ResolverField] + ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { + val searchRequest: MultiSearchRequest = multiBoolQueryBuilder(indexQueries) + getMultiByIndexedQuery(searchRequest, sortByField, buildF, resolverField) + } + /* Provide a specific Bool Query*/ def getQ[A]( esIndex: String, @@ -431,15 +454,13 @@ class ElasticRetriever @Inject() ( sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty ): Future[(IndexedSeq[A], JsValue, Long)] = { - val limitClause = pagination.toES - val searchRequest: SearchRequest = search(esIndex) - .bool(boolQ) - .start(limitClause._1) - .limit(limitClause._2) - .aggs(aggs) - .trackTotalHits(true) - .fetchSource(false) - .sourceExclude(excludedFields) + val indexQuery = IndexBoolQuery(esIndex = esIndex, + boolQuery = boolQ, + pagination = pagination, + aggs = aggs, + excludedFields = excludedFields + ) + val searchRequest: SearchRequest = BoolQueryBuilder(indexQuery) getByIndexedQuery(searchRequest, sortByField, buildF) } diff --git a/app/models/ElasticRetrieverQueryBuilders.scala b/app/models/ElasticRetrieverQueryBuilders.scala index 924f7082..95483659 100644 --- a/app/models/ElasticRetrieverQueryBuilders.scala +++ b/app/models/ElasticRetrieverQueryBuilders.scala @@ -27,6 +27,14 @@ case class IndexQuery[V]( excludedFields: Seq[String] = Seq.empty ) +case class IndexBoolQuery( + esIndex: String, + boolQuery: BoolQuery, + pagination: Pagination, + aggs: Iterable[AbstractAggregation] = Iterable.empty, + excludedFields: Seq[String] = Seq.empty +) + trait ElasticRetrieverQueryBuilders extends QueryApi with Logging { def IndexQueryMust[V](indexQuery: IndexQuery[V]): SearchRequest = @@ -99,14 +107,37 @@ trait ElasticRetrieverQueryBuilders extends QueryApi with Logging { .sourceExclude(indexQuery.excludedFields) } + def BoolQueryBuilder( + indexQuery: IndexBoolQuery + ): SearchRequest = { + val limitClause = indexQuery.pagination.toES + val searchRequest: SearchRequest = search(indexQuery.esIndex) + .bool(indexQuery.boolQuery) + .start(limitClause._1) + .limit(limitClause._2) + .aggs(indexQuery.aggs) + .trackTotalHits(true) + .sourceExclude(indexQuery.excludedFields) + + searchRequest + } + + def multiBoolQueryBuilder( + indexQueries: Seq[IndexBoolQuery] + ): MultiSearchRequest = { + val searchRequests = indexQueries.map { query => + BoolQueryBuilder(query) + } + multi(searchRequests) + } + def multiSearchTermsBuilder[V]( indexQueries: Seq[IndexQuery[V]], f: Iterable[Query] => BoolQuery ): MultiSearchRequest = { - val searchRequests = indexQueries.map { query => { + val searchRequests = indexQueries.map { query => getByIndexTermsBuilder(query, f) } - } multi(searchRequests) } -} \ No newline at end of file +} diff --git a/app/models/entities/CredibleSets.scala b/app/models/entities/CredibleSets.scala index e244537a..b89b03f1 100644 --- a/app/models/entities/CredibleSets.scala +++ b/app/models/entities/CredibleSets.scala @@ -6,7 +6,6 @@ import play.api.libs.json.JsValue import sangria.schema.{ObjectType, Field, ListType, LongType, fields} import models.gql.TypeWithId - case class CredibleSets( count: Long, rows: IndexedSeq[JsValue], diff --git a/app/models/gql/DeferredResolvers.scala b/app/models/gql/DeferredResolvers.scala index 54d4dbb8..21132c76 100644 --- a/app/models/gql/DeferredResolvers.scala +++ b/app/models/gql/DeferredResolvers.scala @@ -7,19 +7,17 @@ import scala.concurrent._ import models.gql.Arguments.studyId import models.gql.Objects.locationAndSourceImp - trait TypeWithId { val id: String } - /** @param id The ID to resolve on * @param grouping A tuple of the values that are used to group the deferred values * @tparam T The type of the deferred value */ abstract class DeferredMultiTerm[+T]() extends Deferred[T] { val id: String - val grouping: Product + val grouping: Product def empty(): T def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[T]] } @@ -27,7 +25,7 @@ abstract class DeferredMultiTerm[+T]() extends Deferred[T] { case class LocusDeferred(studyLocusId: String, variantIds: Option[Seq[String]], pagination: Option[Pagination] - ) extends DeferredMultiTerm[Loci] { +) extends DeferredMultiTerm[Loci] { val id: String = studyLocusId val grouping = (variantIds, pagination) def empty(): Loci = Loci.empty() @@ -40,20 +38,38 @@ case class LocusDeferred(studyLocusId: String, } } -case class CredibleSetsByStudyDeferred(studyId: String, pagination: Option[Pagination]) extends DeferredMultiTerm[CredibleSets] { +case class CredibleSetsByStudyDeferred(studyId: String, pagination: Option[Pagination]) + extends DeferredMultiTerm[CredibleSets] { val id: String = studyId val grouping = (pagination) def empty(): CredibleSets = CredibleSets.empty def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[CredibleSets]] = { case (s: Seq[String], options: Product) => options match { - case (p) => { - ctx.getCredibleSetsByStudy(s, p.asInstanceOf[Option[Pagination]]) - } + case (p) => + ctx.getCredibleSetsByStudy(s, p.asInstanceOf[Option[Pagination]]) } } } +case class CredibleSetsByVariantDeferred(variantId: String, + studyTypes: Option[Seq[StudyTypeEnum.Value]], + pagination: Option[Pagination] +) extends DeferredMultiTerm[CredibleSets] { + val id: String = variantId + val grouping = (studyTypes, pagination) + def empty(): CredibleSets = CredibleSets.empty + def resolver(ctx: Backend): (Seq[String], Product) => Future[IndexedSeq[CredibleSets]] = { + case (v: Seq[String], options: Product) => + options match { + case (s, p) => + ctx.getCredibleSetsByVariant(v, + s.asInstanceOf[Option[Seq[StudyTypeEnum.Value]]], + p.asInstanceOf[Option[Pagination]] + ) + } + } +} /** A deferred resolver for cases where we can't use the Fetch API because we resolve the * values on multiple terms/filters. @@ -86,14 +102,17 @@ class MultiTermResolver extends DeferredResolver[Backend] with Logging { def resolve(deferred: Vector[Deferred[Any]], ctx: Backend, queryState: Any)(implicit ec: ExecutionContext ): Vector[Future[Any]] = { - val deferredByType = deferred collect { - case locus: LocusDeferred => locus - case credSetByStudy: CredibleSetsByStudyDeferred => credSetByStudy + val deferredByType = deferred collect { + case locus: LocusDeferred => locus + case credSetByStudy: CredibleSetsByStudyDeferred => credSetByStudy + case credSetByVariant: CredibleSetsByVariantDeferred => credSetByVariant } val results = groupResults(deferredByType, ctx) - deferred.map { - case locus: LocusDeferred => getResultForId(locus, results) - case credSetByStudy: CredibleSetsByStudyDeferred => getResultForId(credSetByStudy, results) + deferred.map { + case locus: LocusDeferred => getResultForId(locus, results) + case credSetByStudy: CredibleSetsByStudyDeferred => getResultForId(credSetByStudy, results) + case credSetByVariant: CredibleSetsByVariantDeferred => + getResultForId(credSetByVariant, results) } } } diff --git a/app/models/gql/Objects.scala b/app/models/gql/Objects.scala index 27943d93..a26a5324 100644 --- a/app/models/gql/Objects.scala +++ b/app/models/gql/Objects.scala @@ -1351,7 +1351,6 @@ object Objects extends Logging { resolve = r => { val studyLocusId = r.value.otherStudyLocusId.getOrElse("") logger.debug(s"Finding colocalisation credible set: $studyLocusId") - // r.ctx.getCredibleSet(studyLocusId) credibleSetFetcher.deferOpt(studyLocusId) } ) @@ -1383,11 +1382,7 @@ object Objects extends Logging { description = Some("Credible sets"), arguments = pageArg :: studyTypes :: Nil, resolve = r => { - val studyTypesSeq = r.arg(studyTypes).getOrElse(Seq.empty) - val variantIdSeq = Seq(r.value.variantId) - val credSetQueryArgs = - CredibleSetQueryArgs(variantIds = variantIdSeq, studyTypes = studyTypesSeq) - r.ctx.getCredibleSets(credSetQueryArgs, r.arg(pageArg)) + CredibleSetsByVariantDeferred(r.value.variantId, r.arg(studyTypes), r.arg(pageArg)) } ), Field( From d423842bfa53c2f9fe8acb026fbc26c89f9a66c6 Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Mon, 2 Dec 2024 11:55:18 +0000 Subject: [PATCH 5/7] add ResultsHandler --- app/models/ElasticRetriever.scala | 120 +++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 36 deletions(-) diff --git a/app/models/ElasticRetriever.scala b/app/models/ElasticRetriever.scala index cb6c1851..0792e751 100644 --- a/app/models/ElasticRetriever.scala +++ b/app/models/ElasticRetriever.scala @@ -34,6 +34,39 @@ object ResolverField { def apply(matched_queries: Boolean): ResolverField = ResolverField(None, matched_queries) } +case class Results[A]( + mappedHits: IndexedSeq[A], + aggs: JsValue, + total: Long, + resolverField: JsValue +) +object Results { + def apply[A](mappedHits: IndexedSeq[A], aggs: JsValue, total: Long, resolverField: JsValue): Results[A] = + new Results(mappedHits, aggs, total, resolverField) + def apply[A](mappedHits: IndexedSeq[A], aggs: JsValue, total: Long): Results[A] = + new Results(mappedHits, aggs, total, JsNull) + def empty[A]: Results[A] = Results(IndexedSeq.empty, JsNull, 0, JsNull) +} + +class ResultHandler[A](result: JsValue, buildF: JsValue => Option[A]) { + val hits: JsArray = (result \ "hits" \ "hits").get.as[JsArray] + val aggs: JsValue = (result \ "aggregations").getOrElse(JsNull) + val total: Long = (result \ "hits" \ "total" \ "value").as[Long] + val mappedHits: IndexedSeq[A] = hits.value + .map { jObj => + buildF(jObj) + } + .withFilter(_.isDefined) + .map(_.get) + .to(IndexedSeq) + val results: Results[A] = Results[A](mappedHits, aggs, total) + +} +object ResultHandler { + def apply[A](result: JsValue, buildF: JsValue => Option[A]): ResultHandler[A] = + new ResultHandler(result, buildF) +} + class ElasticRetriever @Inject() ( client: ElasticClient, hlFields: Seq[String], @@ -134,7 +167,7 @@ class ElasticRetriever @Inject() ( aggs: Iterable[AbstractAggregation] = Iterable.empty, sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { // just log and execute the query val indexQuery: IndexQuery[V] = IndexQuery(esIndex = esIndex, kv = kv, @@ -155,7 +188,7 @@ class ElasticRetriever @Inject() ( aggs: Iterable[AbstractAggregation] = Iterable.empty, sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { // just log and execute the query val indexQuery: IndexQuery[V] = IndexQuery(esIndex = esIndex, kv = kv, @@ -180,7 +213,7 @@ class ElasticRetriever @Inject() ( sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty, filter: Seq[Query] = Seq.empty - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { // just log and execute the query val indexQuery: IndexQuery[V] = IndexQuery(esIndex = esIndex, kv = kv, @@ -205,7 +238,7 @@ class ElasticRetriever @Inject() ( sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty, filter: Seq[Query] = Seq.empty - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { // just log and execute the query val indexQuery: IndexQuery[V] = IndexQuery(esIndex = esIndex, kv = kv, @@ -229,7 +262,7 @@ class ElasticRetriever @Inject() ( aggs: Iterable[AbstractAggregation] = Iterable.empty, sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { val indexQuery: IndexQuery[V] = IndexQuery(esIndex = esIndex, kv = kv, pagination = pagination, @@ -246,7 +279,7 @@ class ElasticRetriever @Inject() ( searchRequest: SearchRequest, sortByField: Option[sort.FieldSort] = None, buildF: JsValue => Option[A] - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { // log and execute the query val searchResponse: Future[Response[SearchResponse]] = executeQuery(searchRequest, sortByField) // convert results into A @@ -301,33 +334,43 @@ class ElasticRetriever @Inject() ( sortedSearchRequest } + // private def getHits(result: JsValue): JsArray = (result \ "hits" \ "hits").get.as[JsArray] + // private def getMappedHits[A](hits: JsArray, buildF: JsValue => Option[A]): IndexedSeq[A] = + // hits.value + // .map { jObj => + // buildF(jObj) + // } + // .withFilter(_.isDefined) + // .map(_.get) + // .to(IndexedSeq) + // private def getAggs(result: JsValue): JsValue = (result \ "aggregations").getOrElse(JsNull) private def handleSearchResponse[A]( searchResponse: Response[SearchResponse], searchQuery: SearchRequest, buildF: JsValue => Option[A] - ): (IndexedSeq[A], JsValue, Long) = + ): Results[A] = searchResponse match { case rf: RequestFailure => logger.debug(s"Request failure for query: $searchQuery") logger.error(s"Elasticsearch error: ${rf.error}") - (IndexedSeq.empty, JsNull, 0) + Results.empty case results: RequestSuccess[SearchResponse] => // parse the full body response into JsValue - val result = Json.parse(results.body.get) - logger.trace(Json.prettyPrint(result)) + val r = ResultHandler(Json.parse(results.body.get), buildF) + //logger.trace(Json.prettyPrint(result)) + // val mappedHits = r.mappedHits + // val aggs = r.aggs + // val total = r.total + + // val mappedHits = hits + // .map { jObj => + // buildF(jObj) + // } + // .withFilter(_.isDefined) + // .map(_.get) + // .to(IndexedSeq) + r.results - val hits = (result \ "hits" \ "hits").get.as[JsArray].value - val aggs = (result \ "aggregations").getOrElse(JsNull) - val total = (result \ "hits" \ "total" \ "value").as[Long] - - val mappedHits = hits - .map { jObj => - buildF(jObj) - } - .withFilter(_.isDefined) - .map(_.get) - .to(IndexedSeq) - (mappedHits, aggs, total) } private def handleMultiSearchResponse[A]( @@ -345,20 +388,24 @@ class ElasticRetriever @Inject() ( val result = Json.parse(results.body.get) logger.trace(Json.prettyPrint(result)) val responses = (result \ "responses").get.as[JsArray].value - responses.map { response => - val hits = (response \ "hits" \ "hits").get.as[JsArray].value - val aggs = (response \ "aggregations").getOrElse(JsNull) - val total = (response \ "hits" \ "total" \ "value").as[Long] + responses.map { response => + // val hits = (response \ "hits" \ "hits").get.as[JsArray].value + val r = ResultHandler(response, buildF) + val hits = r.hits + val aggs = r.aggs + val total = r.total + // val aggs = (response \ "aggregations").getOrElse(JsNull) + // val total = (response \ "hits" \ "total" \ "value").as[Long] val rf = resolverField match { case Some(ResolverField(Some(r), false)) => - hits + hits.value .map { jObj => (jObj \ "_source" \ r).as[JsValue] } .headOption .getOrElse(JsNull) case Some(ResolverField(None, true)) => - hits + hits.value .map { jObj => (jObj \ "matched_queries").as[JsArray].value.headOption.getOrElse(JsNull) } @@ -366,13 +413,14 @@ class ElasticRetriever @Inject() ( .getOrElse(JsNull) case _ => JsNull } - val mappedHits = hits - .map { jObj => - buildF(jObj) - } - .withFilter(_.isDefined) - .map(_.get) - .to(IndexedSeq) + val mappedHits = r.mappedHits + // val mappedHits = hits + // .map { jObj => + // buildF(jObj) + // } + // .withFilter(_.isDefined) + // .map(_.get) + // .to(IndexedSeq) (mappedHits, aggs, total, rf) }.toIndexedSeq } @@ -453,7 +501,7 @@ class ElasticRetriever @Inject() ( aggs: Iterable[AbstractAggregation] = Iterable.empty, sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty - ): Future[(IndexedSeq[A], JsValue, Long)] = { + ): Future[Results[A]] = { val indexQuery = IndexBoolQuery(esIndex = esIndex, boolQuery = boolQ, pagination = pagination, From 71f350d20c1eec05d43fa8016035e2ae07cd09bc Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Mon, 2 Dec 2024 16:48:09 +0000 Subject: [PATCH 6/7] refactor elastic retriever to handle multi search and nested responses --- app/models/Backend.scala | 65 ++++---- app/models/ElasticRetriever.scala | 215 +++++++++++++------------ app/models/entities/Interaction.scala | 5 +- app/models/entities/Interactions.scala | 5 +- 4 files changed, 146 insertions(+), 144 deletions(-) diff --git a/app/models/Backend.scala b/app/models/Backend.scala index d9abb2a5..8528d1e0 100644 --- a/app/models/Backend.scala +++ b/app/models/Backend.scala @@ -26,6 +26,8 @@ import models.entities.SearchFacetsResults._ import models.entities._ import models.gql.Arguments.variantId import models.gql.StudyTypeEnum +import models.InnerResults +import models.Results import org.apache.http.impl.nio.reactor.IOReactorConfig import play.api.cache.AsyncCacheApi import play.api.db.slick.DatabaseConfigProvider @@ -103,10 +105,10 @@ class Backend @Inject() (implicit ElasticRetriever.sortByDesc("llr") ) .map { - case (Seq(), _, _) => + case Results(Seq(), _, _, _) => logger.debug(s"No adverse event found for ${kv.toString}") None - case (seq, agg, _) => + case Results(seq, agg, _, _) => logger.trace(Json.prettyPrint(agg)) val counts = (agg \ "eventCount" \ "value").as[Long] Some(AdverseEvents(counts, seq.head.criticalValue, seq)) @@ -126,8 +128,8 @@ class Backend @Inject() (implicit ) esRetriever.getByIndexedQueryMust(cbIndex, kv, pag, fromJsValue[DiseaseHPO], aggs).map { - case (Seq(), _, _) => Some(DiseaseHPOs(0, Seq())) - case (seq, agg, _) => + case Results(Seq(), _, _, _) => Some(DiseaseHPOs(0, Seq())) + case Results(seq, agg, _, _) => logger.trace(Json.prettyPrint(agg)) val rowsCount = (agg \ "rowsCount" \ "value").as[Long] Some(DiseaseHPOs(rowsCount, seq)) @@ -150,7 +152,7 @@ class Backend @Inject() (implicit fromJsValue[L2GPredictions], sortByField = ElasticRetriever.sortBy("score", SortOrder.Desc) ) - .map(_._1) + .map(_.mappedHits) } def getVariants(ids: Seq[String]): Future[IndexedSeq[VariantIndex]] = { @@ -162,7 +164,7 @@ class Backend @Inject() (implicit pag, fromJsValue[VariantIndex] ) - .map(_._1) + .map(_.mappedHits) r } @@ -175,7 +177,7 @@ class Backend @Inject() (implicit Pagination(Pagination.indexDefault, Pagination.sizeMax), fromJsValue[Biosample] ) - .map(_._1) + .map(_.mappedHits) } def getStudy(ids: Seq[String]): Future[IndexedSeq[JsValue]] = { @@ -190,7 +192,7 @@ class Backend @Inject() (implicit pag, fromJsValue[JsValue] ) - retriever.map(_._1) + retriever.map(_.mappedHits) } def getStudies(queryArgs: StudyQueryArgs, pagination: Option[Pagination]): Future[Studies] = { @@ -220,8 +222,8 @@ class Backend @Inject() (implicit fromJsValue[JsValue] ) retriever.map { - case (Seq(), _, _) => Studies.empty - case (studies, _, count) => + case Results(Seq(), _, _, _) => Studies.empty + case Results(studies, _, count, _) => Studies(count, studies) } } @@ -241,7 +243,7 @@ class Backend @Inject() (implicit ) val colocs = esRetriever .getByIndexedTermsShould(indexName, terms, pag, fromJsValue[Colocalisation], filter = filter) - .map(_._1) + .map(_.mappedHits) colocs.map(_.map { coloc => val otherStudyLocusId: String = if (coloc.leftStudyLocusId != studyLocusId) { coloc.leftStudyLocusId @@ -293,7 +295,7 @@ class Backend @Inject() (implicit "locus", Some("studyLocusId") ) - retriever.map { case (locus, _, counts, studyLocusIds) => + retriever.map { case InnerResults(locus, _, counts, studyLocusIds) => locus.zip(counts).zip(studyLocusIds).map { case ((locus, count), studyLocusId) => Loci(count, Some(locus), studyLocusId.as[String]) } @@ -313,7 +315,7 @@ class Backend @Inject() (implicit fromJsValue[JsValue], excludedFields = Seq("locus", "ldSet") ) - retriever.map(_._1) + retriever.map(_.mappedHits) } def getCredibleSets( @@ -358,8 +360,8 @@ class Backend @Inject() (implicit excludedFields = Seq("locus", "ldSet") ) retriever.map { - case (Seq(), _, _) => CredibleSets.empty - case (credset, _, count) => + case Results(Seq(), _, _, _) => CredibleSets.empty + case Results(credset, _, count, _) => CredibleSets(count, credset) } } @@ -389,8 +391,9 @@ class Backend @Inject() (implicit ) retriever.map { case r => r.map { - case (Seq(), _, _, _) => CredibleSets.empty - case (credsets, _, counts, studyId) => CredibleSets(counts, credsets, studyId.as[String]) + case Results(Seq(), _, _, _) => CredibleSets.empty + case Results(credsets, _, counts, studyId) => + CredibleSets(counts, credsets, studyId.as[String]) } } } @@ -402,7 +405,7 @@ class Backend @Inject() (implicit val pag = pagination.getOrElse(Pagination.mkDefault) val indexName = getIndexOrDefault("credible_set") val termsQueryIter: Option[Iterable[queries.Query]] = studyTypes match { - case Some(studyTypes) => Some(Iterable(must(termQuery("studyType.keyword", studyTypes)))) + case Some(studyTypes) => Some(Iterable(should(termsQuery("studyType.keyword", studyTypes)))) case None => None } // nested query for each variant id in variantIds @@ -433,8 +436,8 @@ class Backend @Inject() (implicit ) retriever.map { case r => r.map { - case (Seq(), _, _, _) => CredibleSets.empty - case (credsets, _, counts, variantId) => + case Results(Seq(), _, _, _) => CredibleSets.empty + case Results(credsets, _, counts, variantId) => CredibleSets(counts, credsets, variantId.as[String]) } } @@ -631,7 +634,7 @@ class Backend @Inject() (implicit Pagination(0, Pagination.sizeMax), fromJsValue[MousePhenotype] ) - .map(_._1) + .map(_.mappedHits) } def getPharmacogenomicsByDrug(id: String): Future[IndexedSeq[Pharmacogenomics]] = { @@ -661,7 +664,7 @@ class Backend @Inject() (implicit Pagination(0, Pagination.sizeMax), fromJsValue[Pharmacogenomics] ) - .map(_._1) + .map(_.mappedHits) } def getOtarProjects(ids: Seq[String]): Future[IndexedSeq[OtarProjects]] = { @@ -694,7 +697,7 @@ class Backend @Inject() (implicit val queryTerm = Map("id.keyword" -> ids) esRetriever .getByIndexedQueryShould(drugIndexName, queryTerm, Pagination(0, ids.size), fromJsValue[Drug]) - .map(_._1) + .map(_.mappedHits) } def getMechanismsOfAction(id: String): Future[MechanismsOfAction] = { @@ -702,14 +705,14 @@ class Backend @Inject() (implicit logger.debug(s"querying ES: getting mechanisms of action for $id") val index = getIndexOrDefault("drugMoA") val queryTerms = Map("chemblIds.keyword" -> id) - val mechanismsOfActionRaw: Future[(IndexedSeq[MechanismOfActionRaw], JsValue, Long)] = + val mechanismsOfActionRaw: Future[Results[MechanismOfActionRaw]] = esRetriever.getByIndexedQueryShould( index, queryTerms, Pagination.mkDefault, fromJsValue[MechanismOfActionRaw] ) - mechanismsOfActionRaw.map(i => Drug.mechanismOfActionRaw2MechanismOfAction(i._1)) + mechanismsOfActionRaw.map(i => Drug.mechanismOfActionRaw2MechanismOfAction(i.mappedHits)) } def getIndications(ids: Seq[String]): Future[IndexedSeq[Indications]] = { @@ -719,7 +722,7 @@ class Backend @Inject() (implicit esRetriever .getByIndexedQueryShould(index, queryTerm, Pagination.mkDefault, fromJsValue[Indications]) - .map(_._1) + .map(_.mappedHits) } def getDrugWarnings(id: String): Future[IndexedSeq[DrugWarning]] = { @@ -734,7 +737,7 @@ class Backend @Inject() (implicit This work around relates to ticket opentargets/platform#1506 */ val drugWarnings = - results._1.foldLeft(Map.empty[(Option[Long]), DrugWarning]) { (dwMap, dw) => + results.mappedHits.foldLeft(Map.empty[(Option[Long]), DrugWarning]) { (dwMap, dw) => if (dwMap.contains((dw.id))) { val old = dwMap((dw.id)) val newDW = @@ -761,14 +764,6 @@ class Backend @Inject() (implicit if (entityNames.contains(e.name) && e.searchIndex.isDefined) } yield e esRetriever.getTermsResultsMapping(entities, queryTerms) - - // withQueryTermsNumberValidation(queryTerms, defaultOTSettings.qValidationLimitNTerms) match { - // case Success(terms) => - // esRetriever.getTermsResultsMapping(entities, terms) - // case Failure(error) => - // Future.failed(error) - // } - } def search( diff --git a/app/models/ElasticRetriever.scala b/app/models/ElasticRetriever.scala index 0792e751..b339fe1f 100644 --- a/app/models/ElasticRetriever.scala +++ b/app/models/ElasticRetriever.scala @@ -41,30 +41,114 @@ case class Results[A]( resolverField: JsValue ) object Results { - def apply[A](mappedHits: IndexedSeq[A], aggs: JsValue, total: Long, resolverField: JsValue): Results[A] = + def apply[A](mappedHits: IndexedSeq[A], + aggs: JsValue, + total: Long, + resolverField: JsValue + ): Results[A] = new Results(mappedHits, aggs, total, resolverField) def apply[A](mappedHits: IndexedSeq[A], aggs: JsValue, total: Long): Results[A] = new Results(mappedHits, aggs, total, JsNull) - def empty[A]: Results[A] = Results(IndexedSeq.empty, JsNull, 0, JsNull) + def empty[A] = Results[A](IndexedSeq.empty, JsNull, 0, JsNull) +} + +case class InnerResults[A]( + mappedHits: IndexedSeq[IndexedSeq[A]], + aggs: JsValue, + total: IndexedSeq[Long], + parentFields: IndexedSeq[JsValue] +) +object InnerResults { + def apply[A]( + mappedHits: IndexedSeq[IndexedSeq[A]], + aggs: JsValue, + total: IndexedSeq[Long], + parentFields: IndexedSeq[JsValue] + ): InnerResults[A] = + new InnerResults(mappedHits, aggs, total, parentFields) + def empty[A] = InnerResults[A](IndexedSeq.empty, JsNull, IndexedSeq.empty, IndexedSeq.empty) } -class ResultHandler[A](result: JsValue, buildF: JsValue => Option[A]) { - val hits: JsArray = (result \ "hits" \ "hits").get.as[JsArray] - val aggs: JsValue = (result \ "aggregations").getOrElse(JsNull) - val total: Long = (result \ "hits" \ "total" \ "value").as[Long] - val mappedHits: IndexedSeq[A] = hits.value +class ResultHandler[A](result: JsValue, + buildF: JsValue => Option[A], + resolverField: Option[ResolverField], + parentField: Option[String], + innerHitsName: Option[String] +) { + lazy val hits: JsArray = (result \ "hits" \ "hits").get.as[JsArray] + lazy val aggs: JsValue = (result \ "aggregations").getOrElse(JsNull) + lazy val total: Long = (result \ "hits" \ "total" \ "value").as[Long] + lazy val mappedHits: IndexedSeq[A] = hits.value .map { jObj => buildF(jObj) } .withFilter(_.isDefined) .map(_.get) .to(IndexedSeq) - val results: Results[A] = Results[A](mappedHits, aggs, total) - + lazy val resolver = resolverField match { + case Some(ResolverField(Some(r), false)) => + hits.value + .map { jObj => + (jObj \ "_source" \ r).as[JsValue] + } + .headOption + .getOrElse(JsNull) + case Some(ResolverField(None, true)) => + hits.value + .map { jObj => + (jObj \ "matched_queries").as[JsArray].value.headOption.getOrElse(JsNull) + } + .headOption + .getOrElse(JsNull) + case _ => JsNull + } + lazy val parents = parentField match { + case Some(pf) => + hits.value + .map { jObj => + (jObj \ "_source" \ pf).as[JsValue] + } + .to(IndexedSeq) + case None => hits.value.map(jObj => JsNull).to(IndexedSeq) + } + lazy val innerHits = hits.value.map { jObj => + (jObj \ "inner_hits" \ innerHitsName.get \ "hits" \ "hits").get.as[JsArray].value + } + lazy val innerHitsTotal = hits.value + .map { jObj => + (jObj \ "inner_hits" \ innerHitsName.get \ "hits" \ "total" \ "value").as[Long] + } + .to(IndexedSeq) + lazy val innerMappedHits = innerHits + .map { jObj => + jObj + .map { innerJObj => + buildF(innerJObj) + } + .withFilter(_.isDefined) + .map(_.get) + .to(IndexedSeq) + } + .to(IndexedSeq) + lazy val results: Results[A] = Results[A](mappedHits, aggs, total, resolver) + lazy val innerResults: InnerResults[A] = + InnerResults[A](innerMappedHits, aggs, innerHitsTotal, parents) + } object ResultHandler { def apply[A](result: JsValue, buildF: JsValue => Option[A]): ResultHandler[A] = - new ResultHandler(result, buildF) + new ResultHandler(result, buildF, None, None, None) + def apply[A](result: JsValue, + buildF: JsValue => Option[A], + resolverField: Option[ResolverField] + ): ResultHandler[A] = + new ResultHandler(result, buildF, resolverField, None, None) + def apply[A](result: JsValue, + buildF: JsValue => Option[A], + parentField: Option[String], + innerHitsName: String + ): ResultHandler[A] = + new ResultHandler(result, buildF, None, parentField, Some(innerHitsName)) } class ElasticRetriever @Inject() ( @@ -293,7 +377,7 @@ class ElasticRetriever @Inject() ( sortByField: Option[sort.FieldSort] = None, buildF: JsValue => Option[A], resolverField: Option[ResolverField] - ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { + ): Future[IndexedSeq[Results[A]]] = { // log and execute the query val searchResponse: Future[Response[MultiSearchResponse]] = executeMultiQuery(searchRequest, sortByField) @@ -334,16 +418,6 @@ class ElasticRetriever @Inject() ( sortedSearchRequest } - // private def getHits(result: JsValue): JsArray = (result \ "hits" \ "hits").get.as[JsArray] - // private def getMappedHits[A](hits: JsArray, buildF: JsValue => Option[A]): IndexedSeq[A] = - // hits.value - // .map { jObj => - // buildF(jObj) - // } - // .withFilter(_.isDefined) - // .map(_.get) - // .to(IndexedSeq) - // private def getAggs(result: JsValue): JsValue = (result \ "aggregations").getOrElse(JsNull) private def handleSearchResponse[A]( searchResponse: Response[SearchResponse], searchQuery: SearchRequest, @@ -356,19 +430,9 @@ class ElasticRetriever @Inject() ( Results.empty case results: RequestSuccess[SearchResponse] => // parse the full body response into JsValue - val r = ResultHandler(Json.parse(results.body.get), buildF) - //logger.trace(Json.prettyPrint(result)) - // val mappedHits = r.mappedHits - // val aggs = r.aggs - // val total = r.total - - // val mappedHits = hits - // .map { jObj => - // buildF(jObj) - // } - // .withFilter(_.isDefined) - // .map(_.get) - // .to(IndexedSeq) + val result = Json.parse(results.body.get) + logger.trace(Json.prettyPrint(result)) + val r = ResultHandler(result, buildF) r.results } @@ -378,7 +442,7 @@ class ElasticRetriever @Inject() ( searchQuery: MultiSearchRequest, buildF: JsValue => Option[A], resolverField: Option[ResolverField] - ): IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)] = + ): IndexedSeq[Results[A]] = searchResponse match { case rf: RequestFailure => logger.debug(s"Request failure for query: $searchQuery") @@ -388,40 +452,9 @@ class ElasticRetriever @Inject() ( val result = Json.parse(results.body.get) logger.trace(Json.prettyPrint(result)) val responses = (result \ "responses").get.as[JsArray].value - responses.map { response => - // val hits = (response \ "hits" \ "hits").get.as[JsArray].value - val r = ResultHandler(response, buildF) - val hits = r.hits - val aggs = r.aggs - val total = r.total - // val aggs = (response \ "aggregations").getOrElse(JsNull) - // val total = (response \ "hits" \ "total" \ "value").as[Long] - val rf = resolverField match { - case Some(ResolverField(Some(r), false)) => - hits.value - .map { jObj => - (jObj \ "_source" \ r).as[JsValue] - } - .headOption - .getOrElse(JsNull) - case Some(ResolverField(None, true)) => - hits.value - .map { jObj => - (jObj \ "matched_queries").as[JsArray].value.headOption.getOrElse(JsNull) - } - .headOption - .getOrElse(JsNull) - case _ => JsNull - } - val mappedHits = r.mappedHits - // val mappedHits = hits - // .map { jObj => - // buildF(jObj) - // } - // .withFilter(_.isDefined) - // .map(_.get) - // .to(IndexedSeq) - (mappedHits, aggs, total, rf) + responses.map { response => + val r = ResultHandler(response, buildF, resolverField) + r.results }.toIndexedSeq } @@ -431,45 +464,17 @@ class ElasticRetriever @Inject() ( buildF: JsValue => Option[A], innerHitsName: String, parentField: Option[String] - ): (IndexedSeq[IndexedSeq[A]], JsValue, IndexedSeq[Long], IndexedSeq[JsValue]) = + ): InnerResults[A] = searchResponse match { case rf: RequestFailure => logger.debug(s"Request failure for query: $searchQuery") logger.error(s"Elasticsearch error: ${rf.error}") - (IndexedSeq.empty, JsNull, IndexedSeq.empty, IndexedSeq.empty) + InnerResults.empty case results: RequestSuccess[SearchResponse] => val result = Json.parse(results.body.get) - val hits = (result \ "hits" \ "hits").get.as[JsArray].value - val parentfields = parentField match { - case Some(pf) => - hits - .map { jObj => - (jObj \ "_source" \ pf).as[JsValue] - } - .to(IndexedSeq) - case None => hits.map(jObj => JsNull).to(IndexedSeq) - } - val innerHits = hits.map { jObj => - (jObj \ "inner_hits" \ innerHitsName \ "hits" \ "hits").get.as[JsArray].value - } - val aggs = (result \ "aggregations").getOrElse(JsNull) - val total = hits - .map { jObj => - (jObj \ "inner_hits" \ innerHitsName \ "hits" \ "total" \ "value").as[Long] - } - .to(IndexedSeq) - val mappedHits = innerHits - .map { jObj => - jObj - .map { innerJObj => - buildF(innerJObj) - } - .withFilter(_.isDefined) - .map(_.get) - .to(IndexedSeq) - } - .to(IndexedSeq) - (mappedHits, aggs, total, parentfields) + logger.trace(Json.prettyPrint(result)) + val r = ResultHandler(result, buildF, parentField, innerHitsName) + r.innerResults } def getMultiByIndexedTermsMust[V, A]( @@ -477,7 +482,7 @@ class ElasticRetriever @Inject() ( buildF: JsValue => Option[A], sortByField: Option[sort.FieldSort] = None, resolverField: Option[ResolverField] - ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { + ): Future[IndexedSeq[Results[A]]] = { val searchRequest: MultiSearchRequest = MultiIndexTermsMust(indexQueries) getMultiByIndexedQuery(searchRequest, sortByField, buildF, resolverField) } @@ -487,7 +492,7 @@ class ElasticRetriever @Inject() ( buildF: JsValue => Option[A], sortByField: Option[sort.FieldSort] = None, resolverField: Option[ResolverField] - ): Future[IndexedSeq[(IndexedSeq[A], JsValue, Long, JsValue)]] = { + ): Future[IndexedSeq[Results[A]]] = { val searchRequest: MultiSearchRequest = multiBoolQueryBuilder(indexQueries) getMultiByIndexedQuery(searchRequest, sortByField, buildF, resolverField) } @@ -522,7 +527,7 @@ class ElasticRetriever @Inject() ( aggs: Iterable[AbstractAggregation] = Iterable.empty, sortByField: Option[sort.FieldSort] = None, excludedFields: Seq[String] = Seq.empty - ): Future[(IndexedSeq[IndexedSeq[A]], JsValue, IndexedSeq[Long], IndexedSeq[JsValue])] = { + ): Future[InnerResults[A]] = { val limitClause = pagination.toES val searchRequest: SearchRequest = search(esIndex) .bool(boolQ) diff --git a/app/models/entities/Interaction.scala b/app/models/entities/Interaction.scala index f49bc8c4..52c8fd48 100644 --- a/app/models/entities/Interaction.scala +++ b/app/models/entities/Interaction.scala @@ -5,6 +5,7 @@ import models.Helpers.fromJsValue import models.entities.Configuration.ElasticsearchSettings import models.gql.Fetchers.targetsFetcher import models.gql.Objects.targetImp +import models.Results import play.api.Logging import play.api.libs.json._ import sangria.schema._ @@ -279,8 +280,8 @@ object Interaction extends Logging { ) ++ interaction.targetB.map("targetB.keyword" -> _) esRetriever.getByIndexedQueryMust(cbIndex, kv.toMap, pag, fromJsValue[JsValue]).map { - case (Seq(), _, _) => IndexedSeq.empty - case (seq, _, _) => seq + case Results(Seq(), _, _, _) => IndexedSeq.empty + case Results(seq, _, _, _) => seq } } } diff --git a/app/models/entities/Interactions.scala b/app/models/entities/Interactions.scala index 8eaba3e0..473c2cab 100644 --- a/app/models/entities/Interactions.scala +++ b/app/models/entities/Interactions.scala @@ -9,6 +9,7 @@ import models.Helpers.fromJsValue import models.{Backend, ElasticRetriever} import models.entities.Configuration.ElasticsearchSettings import models.entities.Interaction.interaction +import models.Results import play.api.Logging import play.api.libs.json._ import sangria.schema.{Field, ListType, LongType, ObjectType, fields} @@ -70,8 +71,8 @@ object Interactions extends Logging { Some(sort.FieldSort("scoring", order = SortOrder.DESC)) ) .map { - case (Seq(), _, _) => None - case (seq, agg, _) => + case Results(Seq(), _, _, _) => None + case Results(seq, agg, _, _) => logger.debug(Json.prettyPrint(agg)) val rowsCount = (agg \ "rowsCount" \ "value").as[Long] From a309a3896716b92e8971a88d2f45c45db848c308 Mon Sep 17 00:00:00 2001 From: James Hayhurst Date: Mon, 2 Dec 2024 21:22:36 +0000 Subject: [PATCH 7/7] increase page size defaults for resolvers --- app/models/Backend.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/app/models/Backend.scala b/app/models/Backend.scala index 8528d1e0..21daefeb 100644 --- a/app/models/Backend.scala +++ b/app/models/Backend.scala @@ -148,7 +148,7 @@ class Backend @Inject() (implicit .getByIndexedTermsMust( indexName, Map("studyLocusId.keyword" -> ids), - Pagination(Pagination.indexDefault, Pagination.sizeMax), + Pagination.mkMax, fromJsValue[L2GPredictions], sortByField = ElasticRetriever.sortBy("score", SortOrder.Desc) ) @@ -157,11 +157,10 @@ class Backend @Inject() (implicit def getVariants(ids: Seq[String]): Future[IndexedSeq[VariantIndex]] = { val indexName = getIndexOrDefault("variant_index") - val pag = Pagination(Pagination.indexDefault, Pagination.sizeMax) val r = esRetriever .getByIndexedTermsMust(indexName, Map("variantId.keyword" -> ids), - pag, + Pagination.mkMax, fromJsValue[VariantIndex] ) .map(_.mappedHits) @@ -174,14 +173,13 @@ class Backend @Inject() (implicit .getByIndexedTermsMust( indexName, Map("biosampleId.keyword" -> ids), - Pagination(Pagination.indexDefault, Pagination.sizeMax), + Pagination.mkMax, fromJsValue[Biosample] ) .map(_.mappedHits) } def getStudy(ids: Seq[String]): Future[IndexedSeq[JsValue]] = { - val pag = Pagination.mkDefault val indexName = getIndexOrDefault("gwas_index") val termsQuery = Map("studyId.keyword" -> ids) val retriever = @@ -189,7 +187,7 @@ class Backend @Inject() (implicit .getByIndexedTermsMust( indexName, termsQuery, - pag, + Pagination.mkMax, fromJsValue[JsValue] ) retriever.map(_.mappedHits) @@ -303,7 +301,6 @@ class Backend @Inject() (implicit } def getCredibleSet(ids: Seq[String]): Future[IndexedSeq[JsValue]] = { - val pag = Pagination.mkDefault val indexName = getIndexOrDefault("credible_set") val termsQuery = Map("studyLocusId.keyword" -> ids) val retriever = @@ -311,7 +308,7 @@ class Backend @Inject() (implicit .getByIndexedTermsMust( indexName, termsQuery, - pag, + Pagination.mkMax, fromJsValue[JsValue], excludedFields = Seq("locus", "ldSet") )