This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Rewrite get push actions queries #13597
Merged
reivilibre
merged 6 commits into
matrix-org:develop
from
Fizzadar:optimise-push-actions-queries
Aug 24, 2022
Merged
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
763a65e
Add txn to get receipts and stream ordering by room for user
Fizzadar 520843b
Get push actions in single query using new receipts by room
Fizzadar 5c6dce6
Remove old get push actions queries
Fizzadar 2531eee
Add changelog file
Fizzadar 55f587c
Rename txn functions with `_txn` suffix
Fizzadar b7a66e3
Destructure output tuples into named variables
Fizzadar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Optimise push action fetching queries. Contributed by Nick @ Beeper (@fizzadar). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -459,6 +459,32 @@ def f(txn: LoggingTransaction) -> List[str]: | |||||
|
||||||
return await self.db_pool.runInteraction("get_push_action_users_in_range", f) | ||||||
|
||||||
def _get_receipts_by_room_txn( | ||||||
self, txn: LoggingTransaction, user_id: str | ||||||
) -> List[Tuple[str, int]]: | ||||||
receipt_types_clause, args = make_in_list_sql_clause( | ||||||
self.database_engine, | ||||||
"receipt_type", | ||||||
( | ||||||
ReceiptTypes.READ, | ||||||
ReceiptTypes.READ_PRIVATE, | ||||||
ReceiptTypes.UNSTABLE_READ_PRIVATE, | ||||||
), | ||||||
) | ||||||
|
||||||
sql = f""" | ||||||
SELECT room_id, MAX(stream_ordering) | ||||||
FROM receipts_linearized | ||||||
INNER JOIN events USING (room_id, event_id) | ||||||
WHERE {receipt_types_clause} | ||||||
AND user_id = ? | ||||||
GROUP BY room_id | ||||||
""" | ||||||
|
||||||
args.extend((user_id,)) | ||||||
txn.execute(sql, args) | ||||||
return cast(List[Tuple[str, int]], txn.fetchall()) | ||||||
|
||||||
async def get_unread_push_actions_for_user_in_range_for_http( | ||||||
self, | ||||||
user_id: str, | ||||||
|
@@ -482,96 +508,32 @@ async def get_unread_push_actions_for_user_in_range_for_http( | |||||
The list will have between 0~limit entries. | ||||||
""" | ||||||
|
||||||
# find rooms that have a read receipt in them and return the next | ||||||
# push actions | ||||||
def get_after_receipt( | ||||||
txn: LoggingTransaction, | ||||||
) -> List[Tuple[str, str, int, str, bool]]: | ||||||
# find rooms that have a read receipt in them and return the next | ||||||
# push actions | ||||||
|
||||||
receipt_types_clause, args = make_in_list_sql_clause( | ||||||
self.database_engine, | ||||||
"receipt_type", | ||||||
( | ||||||
ReceiptTypes.READ, | ||||||
ReceiptTypes.READ_PRIVATE, | ||||||
ReceiptTypes.UNSTABLE_READ_PRIVATE, | ||||||
), | ||||||
) | ||||||
|
||||||
sql = f""" | ||||||
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, | ||||||
ep.highlight | ||||||
FROM ( | ||||||
SELECT room_id, | ||||||
MAX(stream_ordering) as stream_ordering | ||||||
FROM events | ||||||
INNER JOIN receipts_linearized USING (room_id, event_id) | ||||||
WHERE {receipt_types_clause} AND user_id = ? | ||||||
GROUP BY room_id | ||||||
) AS rl, | ||||||
event_push_actions AS ep | ||||||
WHERE | ||||||
ep.room_id = rl.room_id | ||||||
AND ep.stream_ordering > rl.stream_ordering | ||||||
AND ep.user_id = ? | ||||||
AND ep.stream_ordering > ? | ||||||
AND ep.stream_ordering <= ? | ||||||
AND ep.notif = 1 | ||||||
ORDER BY ep.stream_ordering ASC LIMIT ? | ||||||
""" | ||||||
args.extend( | ||||||
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit) | ||||||
) | ||||||
txn.execute(sql, args) | ||||||
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) | ||||||
|
||||||
after_read_receipt = await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt | ||||||
receipts_by_room = dict( | ||||||
await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_http_receipts", | ||||||
self._get_receipts_by_room_txn, | ||||||
user_id=user_id, | ||||||
), | ||||||
) | ||||||
|
||||||
# There are rooms with push actions in them but you don't have a read receipt in | ||||||
# them e.g. rooms you've been invited to, so get push actions for rooms which do | ||||||
# not have read receipts in them too. | ||||||
def get_no_receipt( | ||||||
def get_push_actions( | ||||||
txn: LoggingTransaction, | ||||||
) -> List[Tuple[str, str, int, str, bool]]: | ||||||
receipt_types_clause, args = make_in_list_sql_clause( | ||||||
self.database_engine, | ||||||
"receipt_type", | ||||||
( | ||||||
ReceiptTypes.READ, | ||||||
ReceiptTypes.READ_PRIVATE, | ||||||
ReceiptTypes.UNSTABLE_READ_PRIVATE, | ||||||
), | ||||||
) | ||||||
|
||||||
sql = f""" | ||||||
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, | ||||||
ep.highlight | ||||||
sql = """ | ||||||
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight | ||||||
FROM event_push_actions AS ep | ||||||
INNER JOIN events AS e USING (room_id, event_id) | ||||||
WHERE | ||||||
ep.room_id NOT IN ( | ||||||
SELECT room_id FROM receipts_linearized | ||||||
WHERE {receipt_types_clause} AND user_id = ? | ||||||
GROUP BY room_id | ||||||
) | ||||||
AND ep.user_id = ? | ||||||
ep.user_id = ? | ||||||
AND ep.stream_ordering > ? | ||||||
AND ep.stream_ordering <= ? | ||||||
AND ep.notif = 1 | ||||||
ORDER BY ep.stream_ordering ASC LIMIT ? | ||||||
""" | ||||||
args.extend( | ||||||
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit) | ||||||
) | ||||||
txn.execute(sql, args) | ||||||
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) | ||||||
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) | ||||||
|
||||||
no_read_receipt = await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt | ||||||
push_actions = await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_http", get_push_actions | ||||||
) | ||||||
|
||||||
notifs = [ | ||||||
|
@@ -581,7 +543,10 @@ def get_no_receipt( | |||||
stream_ordering=row[2], | ||||||
actions=_deserialize_action(row[3], row[4]), | ||||||
) | ||||||
for row in after_read_receipt + no_read_receipt | ||||||
for row in push_actions | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to help with readability, especially since you're adding a condition, I'd be tempted to destructure this tuple and then refer to everything by name:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much cleaner! b7a66e3 |
||||||
# Only include push actions with a stream ordering after any receipt, or without any | ||||||
# receipt present (invited to but never read rooms). | ||||||
if row[2] > receipts_by_room.get(row[1], 0) | ||||||
] | ||||||
|
||||||
# Now sort it so it's ordered correctly, since currently it will | ||||||
|
@@ -617,94 +582,34 @@ async def get_unread_push_actions_for_user_in_range_for_email( | |||||
The list will have between 0~limit entries. | ||||||
""" | ||||||
|
||||||
# find rooms that have a read receipt in them and return the most recent | ||||||
# push actions | ||||||
def get_after_receipt( | ||||||
txn: LoggingTransaction, | ||||||
) -> List[Tuple[str, str, int, str, bool, int]]: | ||||||
receipt_types_clause, args = make_in_list_sql_clause( | ||||||
self.database_engine, | ||||||
"receipt_type", | ||||||
( | ||||||
ReceiptTypes.READ, | ||||||
ReceiptTypes.READ_PRIVATE, | ||||||
ReceiptTypes.UNSTABLE_READ_PRIVATE, | ||||||
), | ||||||
) | ||||||
|
||||||
sql = f""" | ||||||
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, | ||||||
ep.highlight, e.received_ts | ||||||
FROM ( | ||||||
SELECT room_id, | ||||||
MAX(stream_ordering) as stream_ordering | ||||||
FROM events | ||||||
INNER JOIN receipts_linearized USING (room_id, event_id) | ||||||
WHERE {receipt_types_clause} AND user_id = ? | ||||||
GROUP BY room_id | ||||||
) AS rl, | ||||||
event_push_actions AS ep | ||||||
INNER JOIN events AS e USING (room_id, event_id) | ||||||
WHERE | ||||||
ep.room_id = rl.room_id | ||||||
AND ep.stream_ordering > rl.stream_ordering | ||||||
AND ep.user_id = ? | ||||||
AND ep.stream_ordering > ? | ||||||
AND ep.stream_ordering <= ? | ||||||
AND ep.notif = 1 | ||||||
ORDER BY ep.stream_ordering DESC LIMIT ? | ||||||
""" | ||||||
args.extend( | ||||||
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit) | ||||||
) | ||||||
txn.execute(sql, args) | ||||||
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) | ||||||
|
||||||
after_read_receipt = await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt | ||||||
receipts_by_room = dict( | ||||||
await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_email_receipts", | ||||||
self._get_receipts_by_room_txn, | ||||||
user_id=user_id, | ||||||
), | ||||||
) | ||||||
|
||||||
# There are rooms with push actions in them but you don't have a read receipt in | ||||||
# them e.g. rooms you've been invited to, so get push actions for rooms which do | ||||||
# not have read receipts in them too. | ||||||
def get_no_receipt( | ||||||
def get_push_actions( | ||||||
txn: LoggingTransaction, | ||||||
) -> List[Tuple[str, str, int, str, bool, int]]: | ||||||
receipt_types_clause, args = make_in_list_sql_clause( | ||||||
self.database_engine, | ||||||
"receipt_type", | ||||||
( | ||||||
ReceiptTypes.READ, | ||||||
ReceiptTypes.READ_PRIVATE, | ||||||
ReceiptTypes.UNSTABLE_READ_PRIVATE, | ||||||
), | ||||||
) | ||||||
|
||||||
sql = f""" | ||||||
sql = """ | ||||||
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, | ||||||
ep.highlight, e.received_ts | ||||||
FROM event_push_actions AS ep | ||||||
INNER JOIN events AS e USING (room_id, event_id) | ||||||
WHERE | ||||||
ep.room_id NOT IN ( | ||||||
SELECT room_id FROM receipts_linearized | ||||||
WHERE {receipt_types_clause} AND user_id = ? | ||||||
GROUP BY room_id | ||||||
) | ||||||
AND ep.user_id = ? | ||||||
ep.user_id = ? | ||||||
AND ep.stream_ordering > ? | ||||||
AND ep.stream_ordering <= ? | ||||||
AND ep.notif = 1 | ||||||
ORDER BY ep.stream_ordering DESC LIMIT ? | ||||||
""" | ||||||
args.extend( | ||||||
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit) | ||||||
) | ||||||
txn.execute(sql, args) | ||||||
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) | ||||||
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) | ||||||
|
||||||
no_read_receipt = await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt | ||||||
push_actions = await self.db_pool.runInteraction( | ||||||
"get_unread_push_actions_for_user_in_range_email", get_push_actions | ||||||
) | ||||||
|
||||||
# Make a list of dicts from the two sets of results. | ||||||
|
@@ -716,7 +621,10 @@ def get_no_receipt( | |||||
actions=_deserialize_action(row[3], row[4]), | ||||||
received_ts=row[5], | ||||||
) | ||||||
for row in after_read_receipt + no_read_receipt | ||||||
for row in push_actions | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto for destructuring here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
# Only include push actions with a stream ordering after any receipt, or without any | ||||||
# receipt present (invited to but never read rooms). | ||||||
if row[2] > receipts_by_room.get(row[1], 0) | ||||||
] | ||||||
|
||||||
# Now sort it so it's ordered correctly, since currently it will | ||||||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be customary, this name should include the
_txn
suffix because it takes a Transaction.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
55f587c