Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Combine AbstractStreamIdTracker and AbstractStreamIdGenerator. #15192

Merged
merged 2 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15192.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Combine `AbstractStreamIdTracker` and `AbstractStreamIdGenerator`.
7 changes: 2 additions & 5 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
AbstractStreamIdTracker,
StreamIdGenerator,
)
from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key
Expand Down Expand Up @@ -91,7 +90,7 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
self._device_list_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_lists_stream",
Expand Down Expand Up @@ -712,9 +711,7 @@ async def add_user_signature_change_to_streams(
The new stream ID.
"""

# TODO: this looks like it's _writing_. Should this be on DeviceStore rather
# than DeviceWorkerStore?
async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
async with self._device_list_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"add_user_sig_change_to_streams",
self._add_user_signature_change_txn,
Expand Down
5 changes: 2 additions & 3 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
)
Expand Down Expand Up @@ -187,8 +186,8 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self._stream_id_gen: AbstractStreamIdTracker
self._backfill_id_gen: AbstractStreamIdTracker
self._stream_id_gen: AbstractStreamIdGenerator
self._backfill_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
# If we're using Postgres than we can use `MultiWriterIdGenerator`
# regardless of whether this process writes to the streams or not.
Expand Down
3 changes: 1 addition & 2 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
AbstractStreamIdTracker,
IdGenerator,
StreamIdGenerator,
)
Expand Down Expand Up @@ -118,7 +117,7 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"push_rules_stream",
Expand Down
3 changes: 1 addition & 2 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
)
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
AbstractStreamIdTracker,
StreamIdGenerator,
)
from synapse.types import JsonDict
Expand All @@ -60,7 +59,7 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
self._pushers_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"pushers",
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import (
AbstractStreamIdTracker,
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
Expand All @@ -65,7 +65,7 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdTracker
self._receipts_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._can_write_to_receipts = (
Expand Down Expand Up @@ -768,7 +768,7 @@ async def insert_receipt(
"insert_receipt_conv", self._graph_to_linear, room_id, event_ids
)

async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
async with self._receipts_id_gen.get_next() as stream_id:
event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt",
self._insert_linearized_receipt_txn,
Expand Down
17 changes: 5 additions & 12 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ def _load_current_id(
return res


class AbstractStreamIdTracker(metaclass=abc.ABCMeta):
"""Tracks the "current" stream ID of a stream that may have multiple writers.
class AbstractStreamIdGenerator(metaclass=abc.ABCMeta):
"""Generates or tracks stream IDs for a stream that may have multiple writers.

Each stream ID represents a write transaction, whose completion is tracked
so that the "current" stream ID of the stream can be determined.

Stream IDs are monotonically increasing or decreasing integers representing write
transactions. The "current" stream ID is the stream ID such that all transactions
Expand Down Expand Up @@ -130,16 +133,6 @@ def get_current_token_for_writer(self, instance_name: str) -> int:
"""
raise NotImplementedError()


class AbstractStreamIdGenerator(AbstractStreamIdTracker):
"""Generates stream IDs for a stream that may have multiple writers.

Each stream ID represents a write transaction, whose completion is tracked
so that the "current" stream ID of the stream can be determined.

See `AbstractStreamIdTracker` for more details.
"""

@abc.abstractmethod
def get_next(self) -> AsyncContextManager[int]:
"""
Expand Down