Skip to content

Commit

Permalink
fix: smaller cohort resource usage
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Oct 19, 2024
1 parent 3653de9 commit e50be86
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def is_group_property(p: Property) -> bool:
return p.type == "group"


def is_dynamic_cohort_property(p: Property) -> bool:
return p.type == "cohort"


def is_static_cohort_property(p: Property) -> bool:
return p.type == "precalculated-cohort"


class SessionRecordingQueryResult(NamedTuple):
results: list
has_more_recording: bool
Expand Down Expand Up @@ -224,7 +232,17 @@ def _where_predicates(self) -> Union[ast.And, ast.Or]:
)
)

remaining_properties = self._strip_person_and_event_properties(self._filter.property_groups)
cohort_subquery = CohortPropertyGroupsSubQuery(self._team, self._filter, self.ttl_days).get_queries()
if cohort_subquery:
optional_exprs.append(
ast.CompareOperation(
op=ast.CompareOperationOp.In,
left=ast.Field(chain=["s", "distinct_id"]),
right=cohort_subquery,
)
)

remaining_properties = self._strip_person_and_event_and_cohort_properties(self._filter.property_groups)
if remaining_properties:
logger.info(
"session_replay_query_builder has unhandled properties", unhandled_properties=remaining_properties
Expand Down Expand Up @@ -267,11 +285,15 @@ def _where_predicates(self) -> Union[ast.And, ast.Or]:
def _having_predicates(self) -> ast.Expr:
return property_to_expr(self._filter.having_predicates, team=self._team, scope="replay")

def _strip_person_and_event_properties(self, property_group: PropertyGroup) -> PropertyGroup | None:
def _strip_person_and_event_and_cohort_properties(self, property_group: PropertyGroup) -> PropertyGroup | None:
property_groups_to_keep = [
g
for g in property_group.flat
if not is_event_property(g) and not is_person_property(g) and not is_group_property(g)
if not is_event_property(g)
and not is_person_property(g)
and not is_group_property(g)
and not is_dynamic_cohort_property(g)
and not is_static_cohort_property(g)
]

return (
Expand Down Expand Up @@ -334,6 +356,53 @@ def _where_predicates(self) -> ast.Expr:
)


class CohortPropertyGroupsSubQuery:
_team: Team
_filter: SessionRecordingsFilter
_ttl_days: int

raw_cohort_to_distinct_id = """
select distinct_id
from person_distinct_ids
where person_id IN (
SELECT person_id
FROM cohort_people
WHERE
{where_predicates}
)
"""

def __init__(self, team: Team, filter: SessionRecordingsFilter, ttl_days: int):
self._team = team
self._filter = filter
self._ttl_days = ttl_days

def get_queries(self) -> ast.SelectQuery | ast.SelectUnionQuery | None:
if self.cohort_properties:
return parse_select(
self.raw_cohort_to_distinct_id,
{"where_predicates": property_to_expr(self.cohort_properties, team=self._team, scope="replay")},
)

return None

@cached_property
def cohort_properties(self) -> PropertyGroup | None:
cohort_property_groups = [
g
for g in self._filter.property_groups.flat
if is_dynamic_cohort_property(g) or is_static_cohort_property(g)
]
return (
PropertyGroup(
type=self._filter.property_operand,
values=cohort_property_groups,
)
if cohort_property_groups
else None
)


class PersonsIdCompareOperation:
_team: Team
_filter: SessionRecordingsFilter
Expand Down
Loading

0 comments on commit e50be86

Please sign in to comment.