Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move event stream handling out of slave store. (#7491)
Browse files Browse the repository at this point in the history
This allows us to have the logic on both master and workers, which is necessary to move event persistence off master.

We also combine the instantiation of ID generators from DataStore and slave stores to the base worker stores. This allows us to select which process writes events independently of the master/worker splits.
  • Loading branch information
erikjohnston authored May 15, 2020
1 parent 5355421 commit 1f36ff6
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 116 deletions.
1 change: 1 addition & 0 deletions changelog.d/7491.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move event stream handling out of slave store.
89 changes: 0 additions & 89 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
# limitations under the License.
import logging

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
from synapse.storage.data_stores.main.event_push_actions import (
EventPushActionsWorkerStore,
Expand All @@ -35,7 +30,6 @@
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker

logger = logging.getLogger(__name__)

Expand All @@ -62,11 +56,6 @@ class SlavedEventStore(
BaseSlavedStore,
):
def __init__(self, database: Database, db_conn, hs):
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)

super(SlavedEventStore, self).__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
Expand All @@ -92,81 +81,3 @@ def get_room_max_stream_ordering(self):

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
self.invalidate_caches_for_event(
-token,
row.event_id,
row.room_id,
row.type,
row.state_key,
row.redacts,
row.relates_to,
backfilled=True,
)
return super().process_replication_rows(stream_name, instance_name, token, rows)

def _process_event_stream_row(self, token, row):
data = row.data

if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event(
token,
data.event_id,
data.room_id,
data.type,
data.state_key,
data.redacts,
data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
self._curr_state_delta_stream_cache.entity_has_changed(
row.data.room_id, token
)

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

def invalidate_caches_for_event(
self,
stream_ordering,
event_id,
room_id,
etype,
state_key,
redacts,
relates_to,
backfilled,
):
self._invalidate_get_event_cache(event_id)

self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)

if redacts:
self._invalidate_get_event_cache(redacts)

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))

if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
8 changes: 0 additions & 8 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,11 @@
# limitations under the License.

from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
from synapse.storage.database import Database

from ._slaved_id_tracker import SlavedIdTracker
from .events import SlavedEventStore


class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def __init__(self, database: Database, db_conn, hs):
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)
super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)

def get_push_rules_stream_token(self):
return (
self._push_rules_stream_id_gen.get_current_token(),
Expand Down
17 changes: 0 additions & 17 deletions synapse/storage/data_stores/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
ChainedIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
Expand Down Expand Up @@ -125,19 +124,6 @@ def __init__(self, database: Database, db_conn, hs):
self._clock = hs.get_clock()
self.database_engine = database.engine

self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
extra_tables=[("local_invites", "stream_id")],
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
Expand All @@ -164,9 +150,6 @@ def __init__(self, database: Database, db_conn, hs):
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rules_stream_id_gen = ChainedIdGenerator(
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
)
self._pushers_id_gen = StreamIdGenerator(
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
Expand Down
102 changes: 100 additions & 2 deletions synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

import itertools
import logging
from typing import Any, Iterable, Optional
from typing import Any, Iterable, Optional, Tuple

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine
Expand Down Expand Up @@ -66,7 +71,22 @@ def get_all_updated_caches_txn(txn):
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "caches":
if stream_name == "events":
for row in rows:
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
for row in rows:
self._invalidate_caches_for_event(
-token,
row.event_id,
row.room_id,
row.type,
row.state_key,
row.redacts,
row.relates_to,
backfilled=True,
)
elif stream_name == "caches":
if self._cache_id_gen:
self._cache_id_gen.advance(instance_name, token)

Expand All @@ -85,6 +105,84 @@ def process_replication_rows(self, stream_name, instance_name, token, rows):

super().process_replication_rows(stream_name, instance_name, token, rows)

def _process_event_stream_row(self, token, row):
data = row.data

if row.type == EventsStreamEventRow.TypeId:
self._invalidate_caches_for_event(
token,
data.event_id,
data.room_id,
data.type,
data.state_key,
data.redacts,
data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
self._curr_state_delta_stream_cache.entity_has_changed(
row.data.room_id, token
)

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

def _invalidate_caches_for_event(
self,
stream_ordering,
event_id,
room_id,
etype,
state_key,
redacts,
relates_to,
backfilled,
):
self._invalidate_get_event_cache(event_id)

self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)

if redacts:
self._invalidate_get_event_cache(redacts)

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))

if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))

async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
if not cache_func:
return

cache_func.invalidate(keys)
await self.db.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
cache_func.__name__,
keys,
)

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
Expand Down
35 changes: 35 additions & 0 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
from synapse.util.iterutils import batch_iter
Expand Down Expand Up @@ -74,6 +76,31 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)

if hs.config.worker_app is None:
# We are the process in charge of generating stream ids for events,
# so instantiate ID generators based on the database
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
extra_tables=[("local_invites", "stream_id")],
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
else:
# Another process is in charge of persisting events and generating
# stream IDs: rely on the replication streams to let us know which
# IDs we can process.
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)

self._get_event_cache = Cache(
"*getEvent*",
keylen=3,
Expand All @@ -85,6 +112,14 @@ def __init__(self, database: Database, db_conn, hs):
self._event_fetch_list = []
self._event_fetch_ongoing = 0

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "events":
self._stream_id_gen.advance(token)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)

super().process_replication_rows(stream_name, instance_name, token, rows)

def get_received_ts(self, event_id):
"""Get received_ts (when it was persisted) for the event.
Expand Down
Loading

0 comments on commit 1f36ff6

Please sign in to comment.