Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix processing of groups stream, and use symbolic names for streams #7117

Merged
merged 3 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7117.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug which meant that groups updates were not correctly replicated between workers.
35 changes: 23 additions & 12 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,23 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import (
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
Expand Down Expand Up @@ -626,7 +637,7 @@ async def process_and_notify(self, stream_name, token, rows):
if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows)

if stream_name == "events":
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
Expand All @@ -649,44 +660,44 @@ async def process_and_notify(self, stream_name, token, rows):
)

await self.pusher_pool.on_new_notifications(token, token)
elif stream_name == "push_rules":
elif stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
)
elif stream_name in ("account_data", "tag_account_data"):
elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows]
)
elif stream_name == "receipts":
elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows]
)
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
elif stream_name == "typing":
elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == "to_device":
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == "device_lists":
elif stream_name == DeviceListsStream.NAME:
all_room_ids = set()
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
elif stream_name == PresenceStream.NAME:
await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
elif stream_name == "pushers":
elif stream_name == PushersStream.NAME:
for row in rows:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
Expand Down
70 changes: 52 additions & 18 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,61 @@
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""

from . import _base, events, federation
from synapse.replication.tcp.streams._base import (
AccountDataStream,
BackfillStream,
CachesStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PublicRoomsStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
UserSignatureStream,
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream

STREAMS_MAP = {
stream.NAME: stream
for stream in (
events.EventsStream,
_base.BackfillStream,
_base.PresenceStream,
_base.TypingStream,
_base.ReceiptsStream,
_base.PushRulesStream,
_base.PushersStream,
_base.CachesStream,
_base.PublicRoomsStream,
_base.DeviceListsStream,
_base.ToDeviceStream,
federation.FederationStream,
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.GroupServerStream,
_base.UserSignatureStream,
EventsStream,
BackfillStream,
PresenceStream,
TypingStream,
ReceiptsStream,
PushRulesStream,
PushersStream,
CachesStream,
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
TagAccountDataStream,
AccountDataStream,
GroupServerStream,
UserSignatureStream,
)
}

__all__ = [
"STREAMS_MAP",
"BackfillStream",
"PresenceStream",
"TypingStream",
"ReceiptsStream",
"PushRulesStream",
"PushersStream",
"CachesStream",
"PublicRoomsStream",
"DeviceListsStream",
"ToDeviceStream",
"TagAccountDataStream",
"AccountDataStream",
"GroupServerStream",
"UserSignatureStream",
]