Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

make notification of signatures work with workers #6254

Merged
merged 6 commits into from
Nov 1, 2019

Conversation

uhoreg
Copy link
Member

@uhoreg uhoreg commented Oct 25, 2019

The UNION query is kind of ugly, but shows one way of fixing the issue. Basically, the function needs to return the rows from the user signature stream in addition to the device lists stream.

Alternatively, I could use a new stream ID generator instead of using the device stream ID generator, which would involve more code. Though I suspect that it would be the better option.

Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks generally ok, though I think it should probably be a separate stream. I don't know if a separate stream id is required though. @erikjohnston do you have any thoughts on how this stuff is meant to work?

@erikjohnston
Copy link
Member

Yeah, you probably want it to be a separate stream, but can share stream ID generator. (current_state_deltas and events do this).

Adding a stream is a matter of creating a new stream, similar to https://github.com/matrix-org/synapse/blob/master/synapse/replication/tcp/streams/federation.py, and adding it to the stream map https://github.com/matrix-org/synapse/blob/master/synapse/replication/tcp/streams/__init__.py. Then in the worker stores that use the stream add code to stream_positions() and process_replication_rows() e.g. https://github.com/matrix-org/synapse/blob/master/synapse/replication/slave/storage/devices.py#L40-L43

result["device_lists"] = self._device_list_id_gen.get_current_token()
result["user_signature"] = result[
"device_lists"
] = self._device_list_id_gen.get_current_token()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of ugly, but it's what black came up with. 🤷‍♂️

Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.

this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.

@uhoreg uhoreg requested a review from richvdh October 30, 2019 21:56
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a couple of nits

result["device_lists"] = self._device_list_id_gen.get_current_token()
result["user_signature"] = result[
"device_lists"
] = self._device_list_id_gen.get_current_token()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.

this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.

@@ -42,14 +42,19 @@ def __init__(self, db_conn, hs):

def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions()
result["device_lists"] = self._device_list_id_gen.get_current_token()
result["user_signature"] = result[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the existing code isn't good for this, but I'd rather we used the symbolic constants (UserSignatureStream.NAME in this case) for the stream names, which makes it much easier to find the places the streams are used.

return result

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "device_lists":
self._device_list_id_gen.advance(token)
for row in rows:
self._invalidate_caches_for_devices(token, row.user_id, row.destination)
elif stream_name == "user_signature":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elif stream_name == "user_signature":
elif stream_name == UserSignatureStream.NAME:

@uhoreg uhoreg requested a review from richvdh October 31, 2019 17:02
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@uhoreg uhoreg merged commit 3b4216f into develop Nov 1, 2019
babolivier pushed a commit that referenced this pull request Sep 1, 2021
…kers_notify

* commit '3b4216f96':
  clean up code a bit
  make user signatures a separate stream
  add changelog
  make notification of signatures work with workers
@DMRobertson DMRobertson deleted the uhoreg/cross_signing_fix_workers_notify branch June 28, 2022 11:19
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants