Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert _insert_graph_receipts_txn to simple_upsert #16299

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,27 +795,21 @@ async def insert_receipt(
now - event_ts,
)

await self.db_pool.runInteraction(
"insert_graph_receipt",
self._insert_graph_receipt_txn,
await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
event_ids,
thread_id,
data,
# Use READ_COMMITTED to avoid 'could not serialize access due to concurrent
# update' Postgres errors which lead to rollbacks and re-dos.
isolation_level=IsolationLevel.READ_COMMITTED,
)

max_persisted_id = self._receipts_id_gen.get_current_token()

return stream_id, max_persisted_id

def _insert_graph_receipt_txn(
async def _insert_graph_receipt(
self,
txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
Expand All @@ -825,13 +819,6 @@ def _insert_graph_receipt_txn(
) -> None:
assert self._can_write_to_receipts

txn.call_after(
self._get_receipts_for_user_with_orderings.invalidate,
(user_id, receipt_type),
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
Expand All @@ -843,8 +830,8 @@ def _insert_graph_receipt_txn(
else:
keyvalues["thread_id"] = thread_id

self.db_pool.simple_upsert_txn(
txn,
await self.db_pool.simple_upsert(
clokep marked this conversation as resolved.
Show resolved Hide resolved
desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
Expand All @@ -854,6 +841,11 @@ def _insert_graph_receipt_txn(
where_clause=where_clause,
)

self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))

# FIXME: This shouldn't invalidate the whole cache
self._get_linearized_receipts_for_room.invalidate((room_id,))


class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
Expand Down
Loading