Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

make notification of signatures work with workers #6254

Merged
merged 6 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6254.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make notification of cross-signing signatures work with workers.
7 changes: 6 additions & 1 deletion synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ def __init__(self, db_conn, hs):

def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions()
result["device_lists"] = self._device_list_id_gen.get_current_token()
result["user_signature"] = result[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the existing code isn't good for this, but I'd rather we used the symbolic constants (UserSignatureStream.NAME in this case) for the stream names, which makes it much easier to find the places the streams are used.

"device_lists"
] = self._device_list_id_gen.get_current_token()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of ugly, but it's what black came up with. 🤷‍♂️

Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.

this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.

return result

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "device_lists":
self._device_list_id_gen.advance(token)
for row in rows:
self._invalidate_caches_for_devices(token, row.user_id, row.destination)
elif stream_name == "user_signature":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elif stream_name == "user_signature":
elif stream_name == UserSignatureStream.NAME:

for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super(SlavedDeviceStore, self).process_replication_rows(
stream_name, token, rows
)
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.GroupServerStream,
_base.UserSignatureStream,
)
}
18 changes: 18 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str


class Stream(object):
Expand Down Expand Up @@ -438,3 +439,20 @@ def __init__(self, hs):
self.update_function = store.get_all_groups_changes

super(GroupServerStream, self).__init__(hs)


class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key
"""

NAME = "user_signature"
_LIMITED = False
ROW_TYPE = UserSignatureStreamRow

def __init__(self, hs):
store = hs.get_datastore()

self.current_token = store.get_device_stream_token
self.update_function = store.get_all_user_signature_changes_for_remotes

super(UserSignatureStream, self).__init__(hs)
5 changes: 4 additions & 1 deletion synapse/storage/data_stores/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ def __init__(self, db_conn, hs):
db_conn, "public_room_list_stream", "stream_id"
)
self._device_list_id_gen = StreamIdGenerator(
db_conn, "device_lists_stream", "stream_id"
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[("user_signature_stream", "stream_id")],
)
self._cross_signing_id_gen = StreamIdGenerator(
db_conn, "e2e_cross_signing_keys", "stream_id"
Expand Down
24 changes: 24 additions & 0 deletions synapse/storage/data_stores/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,30 @@ def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
from_user_id,
)

def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
"""Return a list of changes from the user signature stream to notify remotes.
Note that the user signature stream represents when a user signs their
device with their user-signing key, which is not published to other
users or servers, so no `destination` is needed in the returned
list. However, this is needed to poke workers.

Args:
from_key (int): the stream ID to start at (exclusive)
to_key (int): the stream ID to end at (inclusive)

Returns:
Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
"""
sql = """
SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
FROM user_signature_stream
WHERE ? < stream_id AND stream_id <= ?
GROUP BY user_id
"""
return self._execute(
"get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
)


class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
Expand Down