Skip to content

Commit

Permalink
Merge pull request #206 from opentargets/3395-use-clickhouse-for-data…
Browse files Browse the repository at this point in the history
…source-filtering

3395 use clickhouse for datasource filtering
  • Loading branch information
jdhayhurst authored Aug 28, 2024
2 parents d027caa + f5e1c23 commit 980b851
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 81 deletions.
2 changes: 2 additions & 0 deletions app/models/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class Backend @Inject() (implicit
val page = pagination.getOrElse(Pagination.mkDefault)
val dss = datasources.getOrElse(defaultOTSettings.clickhouse.harmonic.datasources)
val weights = dss.map(s => (s.id, s.weight))
val mustIncludeDatasources = dss.withFilter(_.required).map(_.id).toSet
val dontPropagate = dss.withFilter(!_.propagate).map(_.id).toSet
val aotfQ = QAOTF(
tableName,
Expand All @@ -484,6 +485,7 @@ class Backend @Inject() (implicit
filter,
orderBy,
weights,
mustIncludeDatasources,
dontPropagate,
page.offset,
page.size
Expand Down
2 changes: 2 additions & 0 deletions app/models/ClickhouseRetriever.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ClickhouseRetriever(dbConfig: DatabaseConfig[ClickHouseProfile], config: O
pagination: Pagination
): Future[Vector[Association]] = {
val weights = datasourceSettings.map(s => (s.id, s.weight))
val mustIncludeDatasources = datasourceSettings.withFilter(_.required).map(_.id).toSet
val dontPropagate = datasourceSettings.withFilter(!_.propagate).map(_.id).toSet
val aotfQ = QAOTF(
tableName,
Expand All @@ -82,6 +83,7 @@ class ClickhouseRetriever(dbConfig: DatabaseConfig[ClickHouseProfile], config: O
BFilter,
None,
weights,
mustIncludeDatasources,
dontPropagate,
pagination.offset,
pagination.size
Expand Down
122 changes: 51 additions & 71 deletions app/models/db/QAOTF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ case class QAOTF(
BFilter: Option[String],
orderScoreBy: Option[(String, String)],
datasourceWeights: Seq[(String, Double)],
mustIncludeDatasources: Set[String],
nonPropagatedDatasources: Set[String],
offset: Int,
size: Int
Expand All @@ -51,19 +52,56 @@ case class QAOTF(
val maxHS: Column = literal(Harmonic.maxValue(100000, pExponentDefault, 1.0))
.as(Some("max_hs_score"))

val BFilterQ: Option[Column] = BFilter flatMap { case matchStr =>
val tokens = matchStr
.split(" ")
.map { s =>
F.like(BData.name, F.lower(literal(s"%${s.toLowerCase.trim}%")))
val filterExpression: Column = {
val BFilterQ: Option[Column] = BFilter flatMap { case matchStr =>
val tokens = matchStr
.split(" ")
.map { s =>
F.like(BData.name, F.lower(literal(s"%${s.toLowerCase.trim}%")))
}
.toList

tokens match {
case h :: Nil => Some(h)
case h1 :: h2 :: rest => Some(F.and(h1, h2, rest: _*))
case _ => None
}
.toList
}
val leftIdsC = F.set((AIDs + AId).map(literal).toSeq)
val nonPP = F.set(nonPropagatedDatasources.map(literal).toSeq)
val expressionLeft = if (nonPropagatedDatasources.nonEmpty) {
F.or(
F.and(
F.in(A, leftIdsC),
F.notIn(DS, nonPP)
),
F.equals(A, literal(AId))
)
} else
F.in(A, leftIdsC)

tokens match {
case h :: Nil => Some(h)
case h1 :: h2 :: rest => Some(F.and(h1, h2, rest: _*))
case _ => None
// in the case we also want to filter B set
val expressionLeftRight = if (BIDs.nonEmpty) {
val rightIdsC = F.set(BIDs.map(literal).toSeq)
F.and(
expressionLeft,
F.in(B, rightIdsC)
)
} else {
expressionLeft
}
val expressionLeftRighWithFilters = {
val expressionLeftRightWithBFilter =
BFilterQ.map(f => F.and(f, expressionLeftRight)).getOrElse(expressionLeftRight)
if (mustIncludeDatasources.nonEmpty) {
F.and(expressionLeftRightWithBFilter,
F.in(DS, F.set(mustIncludeDatasources.map(literal).toSeq))
)
} else {
expressionLeftRightWithBFilter
}
}
expressionLeftRighWithFilters
}

val DSScore: Column = F
Expand All @@ -78,6 +116,7 @@ case class QAOTF(
.as(Some("score_datasource"))

val DSW: Column = F.ifNull(F.any(column("weight")), literal(1.0)).as(Some("datasource_weight"))
val DTAny = F.any(DT).as(Some(DT.rep))

val queryGroupByDS: Query = {
val WC = F
Expand All @@ -92,41 +131,12 @@ case class QAOTF(
Select(DSFieldWC :: WFieldWC :: Nil),
OrderBy(DSFieldWC.asc :: Nil)
)

val leftIdsC = F.set((AIDs + AId).map(literal).toSeq)

val nonPP = F.set(nonPropagatedDatasources.map(literal).toSeq)
val expressionLeft = if (nonPropagatedDatasources.nonEmpty) {
F.or(
F.and(
F.in(A, leftIdsC),
F.notIn(DS, nonPP)
),
F.equals(A, literal(AId))
)
} else
F.in(A, leftIdsC)

// in the case we also want to filter B set
val expressionLeftRight = if (BIDs.nonEmpty) {
val rightIdsC = F.set(BIDs.map(literal).toSeq)

F.and(expressionLeft, F.in(B, rightIdsC))
} else {
expressionLeft
}

val expressionLeftRightWithBFilter =
BFilterQ.map(f => F.and(f, expressionLeftRight)).getOrElse(expressionLeftRight)

val DTAny = F.any(DT).as(Some(DT.rep))

val withDT = With(DSScore :: DTAny :: DSW :: Nil)
val selectDSScores = Select(B :: DSW.name :: DTAny.name :: DS :: DSScore.name :: Nil)
val fromT = From(T, Some("l"))
val joinWeights =
Join(q.toColumn(None), Some("LEFT"), Some("OUTER"), false, Some("r"), DS :: Nil)
val preWhereQ = PreWhere(expressionLeftRightWithBFilter)
val preWhereQ = PreWhere(filterExpression)
val groupByQ = GroupBy(B :: DS :: Nil)

Q(
Expand All @@ -140,40 +150,10 @@ case class QAOTF(
}

def simpleQuery(offset: Int, size: Int): Query = {
val leftIdsC = F.set((AIDs + AId).map(literal).toSeq)

val nonPP = F.set(nonPropagatedDatasources.map(literal).toSeq)
val expressionLeft = if (nonPropagatedDatasources.nonEmpty) {
F.or(
F.and(
F.in(A, leftIdsC),
F.notIn(DS, nonPP)
),
F.equals(A, literal(AId))
)
} else
F.in(A, leftIdsC)

// in the case we also want to filter B set
val expressionLeftRight = if (BIDs.nonEmpty) {
val rightIdsC = F.set(BIDs.map(literal).toSeq)
F.and(
expressionLeft,
F.in(B, rightIdsC)
)
} else {
expressionLeft
}

val expressionLeftRightWithBFilter =
BFilterQ.map(f => F.and(f, expressionLeftRight)).getOrElse(expressionLeftRight)

val DTAny = F.any(DT).as(Some(DT.rep))

val withDT = With(DTAny :: Nil)
val selectDSScores = Select(B :: DTAny.name :: DS :: Nil)
val fromT = From(T, Some("l"))
val preWhereQ = PreWhere(expressionLeftRightWithBFilter)
val preWhereQ = PreWhere(filterExpression)
val groupByQ = GroupBy(B :: DS :: Nil)

val aggDSQ = Q(
Expand Down
6 changes: 5 additions & 1 deletion app/models/entities/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ object Configuration {

case class DiseaseSettings(associations: DbTableSettings)

case class DatasourceSettings(id: String, weight: Double, propagate: Boolean)
case class DatasourceSettings(id: String,
weight: Double,
propagate: Boolean,
required: Boolean = false
)

case class HarmonicSettings(pExponent: Int, datasources: Seq[DatasourceSettings])

Expand Down
18 changes: 9 additions & 9 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ ot {
harmonic {
pExponent = 2
datasources = [
{id: "europepmc", weight: 0.2, data-type = "literature", propagate = true},
{id: "expression_atlas", weight: 0.2, data-type = "rna_expression", propagate = false},
{id: "impc", weight: 0.2, data-type = "animal_model", propagate = true},
{id: "progeny", weight: 0.5, data-type = "affected_pathway", propagate = true},
{id: "slapenrich", weight: 0.5, data-type = "affected_pathway", propagate = true},
{id: "sysbio", weight: 0.5, data-type = "affected_pathway", propagate = true},
{id: "cancer_biomarkers", weight: 0.5, data-type = "affected_pathway", propagate = true},
{id: "ot_crispr", weight: 0.5, data-type = "ot_partner", propagate = true},
{id: "encore", weight: 0.5, data-type = "ot_partner", propagate = true},
{id: "europepmc", weight: 0.2, data-type = "literature", propagate = true, required = false},
{id: "expression_atlas", weight: 0.2, data-type = "rna_expression", propagate = false, required = false},
{id: "impc", weight: 0.2, data-type = "animal_model", propagate = true, required = false},
{id: "progeny", weight: 0.5, data-type = "affected_pathway", propagate = true, required = false},
{id: "slapenrich", weight: 0.5, data-type = "affected_pathway", propagate = true, required = false},
{id: "sysbio", weight: 0.5, data-type = "affected_pathway", propagate = true, required = false},
{id: "cancer_biomarkers", weight: 0.5, data-type = "affected_pathway", propagate = true, required = false},
{id: "ot_crispr", weight: 0.5, data-type = "ot_partner", propagate = true, required = false},
{id: "encore", weight: 0.5, data-type = "ot_partner", propagate = true, required = false},
]
}
}
Expand Down

0 comments on commit 980b851

Please sign in to comment.