Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a get_next_txn method to StreamIdGenerator to match MultiWriterIdGenerator (#15191 #15191

Merged
merged 3 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15191.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a `get_next_txn` method to `StreamIdGenerator` to match `MultiWriterIdGenerator`.
11 changes: 2 additions & 9 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
)
Expand All @@ -64,14 +63,12 @@ def __init__(
):
super().__init__(database, db_conn, hs)

# `_can_write_to_account_data` indicates whether the current worker is allowed
# to write account data. A value of `True` implies that `_account_data_id_gen`
# is an `AbstractStreamIdGenerator` and not just a tracker.
self._account_data_id_gen: AbstractStreamIdTracker
Comment on lines -67 to -70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is no longer true and we're always a writer?

Copy link
Member Author

@anoadragon453 anoadragon453 Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mm, no, not all workers are account_data stream writers. But self._account_data_id_gen is always a AbstractStreamIdGenerator, even on non-writers.

The methods that rely on self._account_data_id_gen being a AbstractStreamIdGenerator should not be called unless we are a writer though. Hence the asserts in those methods, such as:

assert self._can_write_to_account_data

Defining self._account_data_id_gen as a AbstractStreamIdTracker confused mypy though, as AbstractStreamIdTracker doesn't have the methods that a writer need (get_next, get_next_txn, etc.).

mypy didn't yell about get_next not existing on AbstractStreamIdTracker though, possibly because we used it in async with ... blocks instead of calling it directly? Does mypy check those properly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like #14468 essentially removed AbstractStreamIdTracker's concrete classes without removing the abstract class, which seems unfortunate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there's some confusion about making sequence generators AbstractStreamIdTracker in *WorkerStore classes, where it's assumed that the same sequence generator would be an AbstractStreamIdGenerator in the non-worker variant storage class. See ReceiptsWorkerStore for instance:

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdTracker

But *WorkerStore doesn't mean that that storage class can't write to the database. It just means that workers should use that class vs. the main process... right?

Copy link
Member Author

@anoadragon453 anoadragon453 Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also that comment is wrong, ReceiptsStore is just an empty class 🤦

class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass

Looks like there may be a few cases where AbstractStreamIdTrackers should actually be typed asAbstractStreamIdGenerators.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there's some confusion about making sequence generators AbstractStreamIdTracker in *WorkerStore classes, where it's assumed that the same sequence generator would be an AbstractStreamIdGenerator in the non-worker variant storage class. See ReceiptsWorkerStore for instance:

#14468 made this obsolete as far as I can tell. So they just never got updated. 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #15192.

self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
)

self._account_data_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
Expand Down Expand Up @@ -558,7 +555,6 @@ async def add_account_data_to_room(
The maximum stream ID.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)

content_json = json_encoder.encode(content)

Expand Down Expand Up @@ -598,7 +594,6 @@ async def remove_account_data_for_room(
data to delete.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)

def _remove_account_data_for_room_txn(
txn: LoggingTransaction, next_id: int
Expand Down Expand Up @@ -663,7 +658,6 @@ async def add_account_data_for_user(
The maximum stream ID.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)

async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
Expand Down Expand Up @@ -770,7 +764,6 @@ async def remove_account_data_for_user(
to delete.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)

def _remove_account_data_for_user_txn(
txn: LoggingTransaction, next_id: int
Expand Down
45 changes: 44 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]:
"""
raise NotImplementedError()

@abc.abstractmethod
def get_next_txn(self, txn: LoggingTransaction) -> int:
"""
Usage:
stream_id_gen.get_next_txn(txn)
# ... persist events ...
"""
raise NotImplementedError()


class StreamIdGenerator(AbstractStreamIdGenerator):
"""Generates and tracks stream IDs for a stream with a single writer.
Expand Down Expand Up @@ -263,6 +272,40 @@ def manager() -> Generator[Sequence[int], None, None]:

return _AsyncCtxManagerWrapper(manager())

def get_next_txn(self, txn: LoggingTransaction) -> int:
"""
Retrieve the next stream ID from within a database transaction.

Clean-up functions will be called when the transaction finishes.

Args:
txn: The database transaction object.

Returns:
The next stream ID.
"""
if not self._is_writer:
raise Exception("Tried to allocate stream ID on non-writer")

# Get the next stream ID.
with self._lock:
self._current += self._step
next_id = self._current

self._unfinished_ids[next_id] = next_id

def clear_unfinished_id(id_to_clear: int) -> None:
"""A function to mark processing this ID as finished"""
with self._lock:
self._unfinished_ids.pop(id_to_clear)

# Mark this ID as finished once the database transaction itself finishes.
txn.call_after(clear_unfinished_id, next_id)
txn.call_on_exception(clear_unfinished_id, next_id)

# Return the new ID.
return next_id

def get_current_token(self) -> int:
if not self._is_writer:
return self._current
Expand Down Expand Up @@ -568,7 +611,7 @@ def get_next_txn(self, txn: LoggingTransaction) -> int:
"""
Usage:

stream_id = stream_id_gen.get_next(txn)
stream_id = stream_id_gen.get_next_txn(txn)
# ... persist event ...
"""

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def __init__(self, get_first_callback: GetFirstCallbackType):
"""
Args:
get_first_callback: a callback which is called on the first call to
get_next_id_txn; should return the curreent maximum id
get_next_id_txn; should return the current maximum id
"""
# the callback. this is cleared after it is called, so that it can be GCed.
self._callback: Optional[GetFirstCallbackType] = get_first_callback
Expand Down