Skip to content

Commit

Permalink
refine aotf filtering, calculate size based on clickhouse prewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
jdhayhurst committed Sep 2, 2024
1 parent 5a5a433 commit 5d6d298
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 42 deletions.
7 changes: 4 additions & 3 deletions app/models/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,16 @@ class Backend @Inject() (implicit
filter,
orderBy,
weights,
_,
dontPropagate,
page.offset,
page.size
)
val simpleQ = aotfQ(indirectIds, bIds).simpleQuery(0, 100000)
val simpleQ = aotfQ(indirectIds, bIds, mustIncludeDatasources).simpleQuery(0, 100000)

(dbRetriever.executeQuery[String, Query](simpleQ)) flatMap { case assocIds =>
val assocIdSet = assocIds.toSet
val fullQ = aotfQ(indirectIds, assocIdSet).query
val fullQ = aotfQ(indirectIds, assocIdSet, Set.empty).query

if (assocIdSet.nonEmpty) {
dbRetriever.executeQuery[Association, Query](fullQ) map { case assocs =>
Expand All @@ -505,7 +506,7 @@ class Backend @Inject() (implicit
val filteredDS =
assoc.datasourceScores.filter(ds => mustIncludeDatasources.contains(ds.id))
if (filteredDS.isEmpty) None
else Some(assoc.copy(datasourceScores = filteredDS))
else Some(assoc)
}
}
}
Expand Down
38 changes: 0 additions & 38 deletions app/models/ClickhouseRetriever.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,42 +62,4 @@ class ClickhouseRetriever(dbConfig: DatabaseConfig[ClickHouseProfile], config: O
Vector.empty
}
}

def getAssociationsOTF(
tableName: String,
AId: String,
AIDs: Set[String],
BIDs: Set[String],
BFilter: Option[String],
datasourceSettings: Seq[DatasourceSettings],
pagination: Pagination
): Future[Vector[Association]] = {
val weights = datasourceSettings.map(s => (s.id, s.weight))
val dontPropagate = datasourceSettings.withFilter(!_.propagate).map(_.id).toSet
val aotfQ = QAOTF(
tableName,
AId,
AIDs,
BIDs,
BFilter,
None,
weights,
dontPropagate,
pagination.offset,
pagination.size
).query.as[Association]

logger.debug(aotfQ.statements.mkString(" "))

db.run(aotfQ.asTry).map {
case Success(v) => v
case Failure(ex) =>
logger.error(ex.toString)
logger.error(
"harmonic associations query failed " +
s"with query: ${aotfQ.statements.mkString(" ")}"
)
Vector.empty
}
}
}
14 changes: 13 additions & 1 deletion app/models/db/QAOTF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ case class QAOTF(
BFilter: Option[String],
orderScoreBy: Option[(String, String)],
datasourceWeights: Seq[(String, Double)],
mustIncludeDatasources: Set[String],
nonPropagatedDatasources: Set[String],
offset: Int,
size: Int
Expand Down Expand Up @@ -89,7 +90,18 @@ case class QAOTF(
} else {
expressionLeft
}
BFilterQ.map(f => F.and(f, expressionLeftRight)).getOrElse(expressionLeftRight)
val expressionLeftRighWithFilters = {
val expressionLeftRightWithBFilter =
BFilterQ.map(f => F.and(f, expressionLeftRight)).getOrElse(expressionLeftRight)
if (mustIncludeDatasources.nonEmpty) {
F.and(expressionLeftRightWithBFilter,
F.in(DS, F.set(mustIncludeDatasources.map(literal).toSeq))
)
} else {
expressionLeftRightWithBFilter
}
}
expressionLeftRighWithFilters
}

val DSScore: Column = F
Expand Down

0 comments on commit 5d6d298

Please sign in to comment.