Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert event_push_actions, registration, and roommember datastores to async #8197

Merged
merged 6 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8197.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
38 changes: 18 additions & 20 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

import logging
from typing import List
from typing import Dict, List, Union

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
Expand Down Expand Up @@ -383,19 +383,20 @@ def get_no_receipt(txn):
# Now return the first `limit`
return notifs[:limit]

def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
async def get_if_maybe_push_in_range_for_user(
self, user_id: str, min_stream_ordering: int
) -> bool:
"""A fast check to see if there might be something to push for the
user since the given stream ordering. May return false positives.

Useful to know whether to bother starting a pusher on start up or not.

Args:
user_id (str)
min_stream_ordering (int)
user_id
min_stream_ordering

Returns:
Deferred[bool]: True if there may be push to process, False if
there definitely isn't.
True if there may be push to process, False if there definitely isn't.
"""

def _get_if_maybe_push_in_range_for_user_txn(txn):
Expand All @@ -408,22 +409,20 @@ def _get_if_maybe_push_in_range_for_user_txn(txn):
txn.execute(sql, (user_id, min_stream_ordering))
return bool(txn.fetchone())

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_if_maybe_push_in_range_for_user",
_get_if_maybe_push_in_range_for_user_txn,
)

async def add_push_actions_to_staging(self, event_id, user_id_actions):
async def add_push_actions_to_staging(
self, event_id: str, user_id_actions: Dict[str, List[Union[dict, str]]]
) -> None:
"""Add the push actions for the event to the push action staging area.

Args:
event_id (str)
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.

Returns:
Deferred
event_id
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
"""

if not user_id_actions:
Expand Down Expand Up @@ -507,7 +506,7 @@ def _find_stream_orderings_for_times_txn(self, txn):
"Found stream ordering 1 day ago: it's %d", self.stream_ordering_day_ago
)

def find_first_stream_ordering_after_ts(self, ts):
async def find_first_stream_ordering_after_ts(self, ts: int) -> int:
"""Gets the stream ordering corresponding to a given timestamp.

Specifically, finds the stream_ordering of the first event that was
Expand All @@ -516,13 +515,12 @@ def find_first_stream_ordering_after_ts(self, ts):
relatively slow.

Args:
ts (int): timestamp in millis
ts: timestamp in millis

Returns:
Deferred[int]: stream ordering of the first event received on/after
the timestamp
stream ordering of the first event received on/after the timestamp
"""
return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"_find_first_stream_ordering_after_ts_txn",
self._find_first_stream_ordering_after_ts_txn,
ts,
Expand Down
Loading