Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert receipts and events databases to async/await #8076

Merged
merged 4 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8076.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
33 changes: 14 additions & 19 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import itertools
import logging
from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple

import attr
from prometheus_client import Counter

from twisted.internet import defer

import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import RoomVersions
Expand Down Expand Up @@ -113,15 +111,14 @@ def __init__(
hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate EventsStore on master"

@defer.inlineCallbacks
def _persist_events_and_state_updates(
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
):
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.

Expand All @@ -136,7 +133,7 @@ def _persist_events_and_state_updates(
backfilled

Returns:
Deferred: resolves when the events have been persisted
Resolves when the events have been persisted
"""

# We want to calculate the stream orderings as late as possible, as
Expand Down Expand Up @@ -168,7 +165,7 @@ def _persist_events_and_state_updates(
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream

yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
Expand Down Expand Up @@ -206,16 +203,15 @@ def _persist_events_and_state_updates(
(room_id,), list(latest_event_ids)
)

@defer.inlineCallbacks
def _get_events_which_are_prevs(self, event_ids):
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
"""Filter the supplied list of event_ids to get those which are prev_events of
existing (non-outlier/rejected) events.

Args:
event_ids (Iterable[str]): event ids to filter
event_ids: event ids to filter

Returns:
Deferred[List[str]]: filtered event ids
Filtered event ids
"""
results = []

Expand All @@ -240,14 +236,13 @@ def _get_events_which_are_prevs_txn(txn, batch):
results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))

for chunk in batch_iter(event_ids, 100):
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)

return results

@defer.inlineCallbacks
def _get_prevs_before_rejected(self, event_ids):
async def _get_prevs_before_rejected(self, event_ids: Iterable[str]) -> Set[str]:
"""Get soft-failed ancestors to remove from the extremities.

Given a set of events, find all those that have been soft-failed or
Expand All @@ -259,11 +254,11 @@ def _get_prevs_before_rejected(self, event_ids):
are separated by soft failed events.

Args:
event_ids (Iterable[str]): Events to find prev events for. Note
that these must have already been persisted.
event_ids: Events to find prev events for. Note that these must have
already been persisted.

Returns:
Deferred[set[str]]
The previous events.
"""

# The set of event_ids to return. This includes all soft-failed events
Expand Down Expand Up @@ -304,7 +299,7 @@ def _get_prevs_before_rejected_txn(txn, batch):
existing_prevs.add(prev_event_id)

for chunk in batch_iter(event_ids, 100):
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)

Expand Down
46 changes: 19 additions & 27 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.constants import EventContentFields
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
Expand Down Expand Up @@ -94,8 +92,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
where_clause="NOT have_censored",
)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
Expand Down Expand Up @@ -155,19 +152,18 @@ def reindex_txn(txn):

return len(rows)

result = yield self.db_pool.runInteraction(
result = await self.db_pool.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)

if not result:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
)

return result

@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
async def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
Expand Down Expand Up @@ -234,19 +230,18 @@ def reindex_search_txn(txn):

return len(rows_to_update)

result = yield self.db_pool.runInteraction(
result = await self.db_pool.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)

if not result:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.EVENT_ORIGIN_SERVER_TS_NAME
)

return result

@defer.inlineCallbacks
def _cleanup_extremities_bg_update(self, progress, batch_size):
async def _cleanup_extremities_bg_update(self, progress, batch_size):
"""Background update to clean out extremities that should have been
deleted previously.

Expand Down Expand Up @@ -414,26 +409,25 @@ def _cleanup_extremities_bg_update_txn(txn):

return len(original_set)

num_handled = yield self.db_pool.runInteraction(
num_handled = await self.db_pool.runInteraction(
"_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
)

if not num_handled:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.DELETE_SOFT_FAILED_EXTREMITIES
)

def _drop_table_txn(txn):
txn.execute("DROP TABLE _extremities_to_check")

yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_cleanup_extremities_bg_update_drop_table", _drop_table_txn
)

return num_handled

@defer.inlineCallbacks
def _redactions_received_ts(self, progress, batch_size):
async def _redactions_received_ts(self, progress, batch_size):
"""Handles filling out the `received_ts` column in redactions.
"""
last_event_id = progress.get("last_event_id", "")
Expand Down Expand Up @@ -480,17 +474,16 @@ def _redactions_received_ts_txn(txn):

return len(rows)

count = yield self.db_pool.runInteraction(
count = await self.db_pool.runInteraction(
"_redactions_received_ts", _redactions_received_ts_txn
)

if not count:
yield self.db_pool.updates._end_background_update("redactions_received_ts")
await self.db_pool.updates._end_background_update("redactions_received_ts")

return count

@defer.inlineCallbacks
def _event_fix_redactions_bytes(self, progress, batch_size):
async def _event_fix_redactions_bytes(self, progress, batch_size):
"""Undoes hex encoded censored redacted event JSON.
"""

Expand All @@ -511,16 +504,15 @@ def _event_fix_redactions_bytes_txn(txn):

txn.execute("DROP INDEX redactions_censored_redacts")

yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
)

yield self.db_pool.updates._end_background_update("event_fix_redactions_bytes")
await self.db_pool.updates._end_background_update("event_fix_redactions_bytes")

return 1

@defer.inlineCallbacks
def _event_store_labels(self, progress, batch_size):
async def _event_store_labels(self, progress, batch_size):
"""Background update handler which will store labels for existing events."""
last_event_id = progress.get("last_event_id", "")

Expand Down Expand Up @@ -575,11 +567,11 @@ def _event_store_labels_txn(txn):

return nbrows

num_rows = yield self.db_pool.runInteraction(
num_rows = await self.db_pool.runInteraction(
desc="event_store_labels", func=_event_store_labels_txn
)

if not num_rows:
yield self.db_pool.updates._end_background_update("event_store_labels")
await self.db_pool.updates._end_background_update("event_store_labels")

return num_rows
Loading