Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'd58fda99f' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit 'd58fda99f':
  Convert `event_push_actions`, `registration`, and `roommember` datastores to async (#8197)
  Only return devices with keys from `/federation/v1/user/devices/` (#8198)
  • Loading branch information
anoadragon453 committed Oct 20, 2020
2 parents b7672ff + d58fda9 commit 5313899
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 163 deletions.
1 change: 1 addition & 0 deletions changelog.d/8197.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
1 change: 1 addition & 0 deletions changelog.d/8198.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise `/federation/v1/user/devices/` API by only returning devices with encryption keys.
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ def _get_devices_with_keys_by_user_txn(
) -> Tuple[int, List[JsonDict]]:
now_stream_id = self._device_list_id_gen.get_current_token()

devices = self._get_e2e_device_keys_txn(
txn, [(user_id, None)], include_all_devices=True
)
devices = self._get_e2e_device_keys_txn(txn, [(user_id, None)])

if devices:
user_devices = devices[user_id]
Expand Down
38 changes: 18 additions & 20 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

import logging
from typing import List
from typing import Dict, List, Union

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
Expand Down Expand Up @@ -383,19 +383,20 @@ def get_no_receipt(txn):
# Now return the first `limit`
return notifs[:limit]

def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
async def get_if_maybe_push_in_range_for_user(
self, user_id: str, min_stream_ordering: int
) -> bool:
"""A fast check to see if there might be something to push for the
user since the given stream ordering. May return false positives.
Useful to know whether to bother starting a pusher on start up or not.
Args:
user_id (str)
min_stream_ordering (int)
user_id
min_stream_ordering
Returns:
Deferred[bool]: True if there may be push to process, False if
there definitely isn't.
True if there may be push to process, False if there definitely isn't.
"""

def _get_if_maybe_push_in_range_for_user_txn(txn):
Expand All @@ -408,22 +409,20 @@ def _get_if_maybe_push_in_range_for_user_txn(txn):
txn.execute(sql, (user_id, min_stream_ordering))
return bool(txn.fetchone())

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_if_maybe_push_in_range_for_user",
_get_if_maybe_push_in_range_for_user_txn,
)

async def add_push_actions_to_staging(self, event_id, user_id_actions):
async def add_push_actions_to_staging(
self, event_id: str, user_id_actions: Dict[str, List[Union[dict, str]]]
) -> None:
"""Add the push actions for the event to the push action staging area.
Args:
event_id (str)
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.
Returns:
Deferred
event_id
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
"""

if not user_id_actions:
Expand Down Expand Up @@ -507,7 +506,7 @@ def _find_stream_orderings_for_times_txn(self, txn):
"Found stream ordering 1 day ago: it's %d", self.stream_ordering_day_ago
)

def find_first_stream_ordering_after_ts(self, ts):
async def find_first_stream_ordering_after_ts(self, ts: int) -> int:
"""Gets the stream ordering corresponding to a given timestamp.
Specifically, finds the stream_ordering of the first event that was
Expand All @@ -516,13 +515,12 @@ def find_first_stream_ordering_after_ts(self, ts):
relatively slow.
Args:
ts (int): timestamp in millis
ts: timestamp in millis
Returns:
Deferred[int]: stream ordering of the first event received on/after
the timestamp
stream ordering of the first event received on/after the timestamp
"""
return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"_find_first_stream_ordering_after_ts_txn",
self._find_first_stream_ordering_after_ts_txn,
ts,
Expand Down
Loading

0 comments on commit 5313899

Please sign in to comment.