-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Move some replication processing out of generic_worker #9796
Conversation
This is to reduce the difference between master and other workers.
43c3457
to
eb54f66
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally seems fine. A few nits. Convince me that this is safe to do on the master?
synapse/replication/tcp/client.py
Outdated
@@ -197,9 +260,16 @@ async def on_position(self, stream_name: str, instance_name: str, token: int): | |||
# may be streaming. | |||
self.notifier.notify_replication() | |||
|
|||
await self.on_rdata(stream_name, instance_name, token, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of the call to self.store.process_replication_rows
above, given that is also called by on_rdata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've moved the self.notifier.notify_replication()
after the on_rdata(..)
call in case anything listening was requiring the store to have processed the replication.
elif stream_name == PushRulesStream.NAME: | ||
self.notifier.on_new_event( | ||
"push_rules_key", token, users=[row.user_id for row in rows] | ||
) | ||
elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): | ||
self.notifier.on_new_event( | ||
"account_data_key", token, users=[row.user_id for row in rows] | ||
) | ||
elif stream_name == ReceiptsStream.NAME: | ||
self.notifier.on_new_event( | ||
"receipt_key", token, rooms=[row.room_id for row in rows] | ||
) | ||
await self._pusher_pool.on_new_receipts( | ||
token, token, {row.room_id for row in rows} | ||
) | ||
elif stream_name == ToDeviceStream.NAME: | ||
entities = [row.entity for row in rows if row.entity.startswith("@")] | ||
if entities: | ||
self.notifier.on_new_event("to_device_key", token, users=entities) | ||
elif stream_name == DeviceListsStream.NAME: | ||
all_room_ids = set() # type: Set[str] | ||
for row in rows: | ||
if row.entity.startswith("@"): | ||
room_ids = await self.store.get_rooms_for_user(row.entity) | ||
all_room_ids.update(room_ids) | ||
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) | ||
elif stream_name == GroupServerStream.NAME: | ||
self.notifier.on_new_event( | ||
"groups_key", token, users=[row.user_id for row in rows] | ||
) | ||
elif stream_name == PushersStream.NAME: | ||
for row in rows: | ||
if row.deleted: | ||
self.stop_pusher(row.user_id, row.app_id, row.pushkey) | ||
else: | ||
await self.start_pusher(row.user_id, row.app_id, row.pushkey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that all this stuff is going to happen on the master, where it previously did not (iirc each replication client receives its own replication data back). Is that not going to be a problem for any of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc each replication client receives its own replication data back
We filter out the echoes before we get here, as RDATA
includes the sending instance name, so that shouldn't be a problem. Really, I think its a bug that this hasn't been happening on master, e.g. if we split out receipts etc but keep /sync
on master then /sync
requests won't get told about new receipts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We filter out the echoes before we get here,
I went looking for that code, but couldn't find it. Can you point me to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass | ||
|
||
|
||
class WorkerPresenceHandler(BasePresenceHandler): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at some point in this project (not necessarily during this particular PR), could you add some docstrings to WorkerPresenceHandler
and PresenceHandler
to explain how they differ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nods And type hints.
async def update_external_syncs_row( | ||
self, process_id, user_id, is_syncing, sync_time_msec | ||
): | ||
"""Update the syncing users for an external process as a delta. | ||
|
||
This is a no-op when presence is handled by a different worker. | ||
|
||
Args: | ||
process_id (str): An identifier for the process the users are | ||
syncing against. This allows synapse to process updates | ||
as user start and stop syncing against a given process. | ||
user_id (str): The user who has started or stopped syncing | ||
is_syncing (bool): Whether or not the user is now syncing | ||
sync_time_msec(int): Time in ms when the user was last syncing | ||
""" | ||
pass | ||
|
||
async def update_external_syncs_clear(self, process_id): | ||
"""Marks all users that had been marked as syncing by a given process | ||
as offline. | ||
|
||
Used when the process has stopped/disappeared. | ||
|
||
This is a no-op when presence is handled by a different worker. | ||
""" | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about why these are being added. They don't seem to be called or defined anywhere in this diff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err, sorry. A commit in a different PR needed them to be defined on both to appease mypy, I'll back that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh actually, they are needed now. This is because hs.get_presence_handler()
returns BasePresenceHandler
now instead of PresenceHandler
. So either we a) add a no-op impl here, or b) assert the type in the calling code. I vaguely prefer the former to the latter.
FTR without these stub impls:
synapse/replication/tcp/client.py:196: error: "BasePresenceHandler" has no attribute "process_replication_rows" [attr-defined]
synapse/replication/tcp/handler.py:355: error: "BasePresenceHandler" has no attribute "update_external_syncs_row" [attr-defined]
synapse/replication/tcp/handler.py:365: error: "BasePresenceHandler" has no attribute "update_external_syncs_clear" [attr-defined]
pass | ||
|
||
|
||
class WorkerPresenceHandler(BasePresenceHandler): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming that this is a complete lift-and-shift of GenericWorkerPresence
. Is there anything else I should know?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, this is just a straight copy and paste.
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Synapse 1.33.0rc1 (2021-04-28) ============================== Features -------- - Update experimental support for [MSC3083](matrix-org/matrix-spec-proposals#3083): restricting room access via group membership. ([\#9800](#9800), [\#9814](#9814)) - Add experimental support for handling presence on a worker. ([\#9819](#9819), [\#9820](#9820), [\#9828](#9828), [\#9850](#9850)) - Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](#9832)) Bugfixes -------- - Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](#9726)) - Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](#9788)) - Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](#9802)) - Limit the size of HTTP responses read over federation. ([\#9833](#9833)) - Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](#9867)) - Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](#9868)) Improved Documentation ---------------------- - Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](#9801)) Internal Changes ---------------- - Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](#9162)) - Apply `pyupgrade` across the codebase. ([\#9786](#9786)) - Move some replication processing out of `generic_worker`. ([\#9796](#9796)) - Replace `HomeServer.get_config()` with inline references. ([\#9815](#9815)) - Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](#9816)) - Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](#9817)) - Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](#9821)) - Small speed up for joining large remote rooms. ([\#9825](#9825)) - Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](#9838)) - Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](#9845)) - Limit length of accepted email addresses. ([\#9855](#9855)) - Remove redundant `synapse.types.Collection` type definition. ([\#9856](#9856)) - Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](#9858)) - Disable invite rate-limiting by default when running the unit tests. ([\#9871](#9871)) - Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](#9874)) - Make `DomainSpecificString` an `attrs` class. ([\#9875](#9875)) - Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](#9876)) - Remove redundant `_PushHTTPChannel` test class. ([\#9878](#9878)) - Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](#9879)) - Small performance improvement around handling new local presence updates. ([\#9887](#9887))
Synapse 1.33.0 (2021-05-05) =========================== Features -------- - Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](matrix-org/synapse#9909)) Synapse 1.33.0rc2 (2021-04-29) ============================== Bugfixes -------- - Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](matrix-org/synapse#9900)) Synapse 1.33.0rc1 (2021-04-28) ============================== Features -------- - Update experimental support for [MSC3083](matrix-org/matrix-spec-proposals#3083): restricting room access via group membership. ([\#9800](matrix-org/synapse#9800), [\#9814](matrix-org/synapse#9814)) - Add experimental support for handling presence on a worker. ([\#9819](matrix-org/synapse#9819), [\#9820](matrix-org/synapse#9820), [\#9828](matrix-org/synapse#9828), [\#9850](matrix-org/synapse#9850)) - Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](matrix-org/synapse#9832)) Bugfixes -------- - Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](matrix-org/synapse#9726)) - Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](matrix-org/synapse#9788)) - Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](matrix-org/synapse#9802)) - Limit the size of HTTP responses read over federation. ([\#9833](matrix-org/synapse#9833)) - Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](matrix-org/synapse#9867)) - Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](matrix-org/synapse#9868)) Improved Documentation ---------------------- - Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](matrix-org/synapse#9801)) Internal Changes ---------------- - Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](matrix-org/synapse#9162)) - Apply `pyupgrade` across the codebase. ([\#9786](matrix-org/synapse#9786)) - Move some replication processing out of `generic_worker`. ([\#9796](matrix-org/synapse#9796)) - Replace `HomeServer.get_config()` with inline references. ([\#9815](matrix-org/synapse#9815)) - Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](matrix-org/synapse#9816)) - Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](matrix-org/synapse#9817)) - Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](matrix-org/synapse#9821)) - Small speed up for joining large remote rooms. ([\#9825](matrix-org/synapse#9825)) - Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](matrix-org/synapse#9838)) - Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](matrix-org/synapse#9845)) - Limit length of accepted email addresses. ([\#9855](matrix-org/synapse#9855)) - Remove redundant `synapse.types.Collection` type definition. ([\#9856](matrix-org/synapse#9856)) - Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](matrix-org/synapse#9858)) - Disable invite rate-limiting by default when running the unit tests. ([\#9871](matrix-org/synapse#9871)) - Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](matrix-org/synapse#9874)) - Make `DomainSpecificString` an `attrs` class. ([\#9875](matrix-org/synapse#9875)) - Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](matrix-org/synapse#9876)) - Remove redundant `_PushHTTPChannel` test class. ([\#9878](matrix-org/synapse#9878)) - Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](matrix-org/synapse#9879)) - Small performance improvement around handling new local presence updates. ([\#9887](matrix-org/synapse#9887))
This is in preparation for moving presence off master.
Should be reviewable commit by commit.