Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
[WIP] debug for device lists updates
Browse files Browse the repository at this point in the history
Debug for #8631.

I'm having a hard time tracking down what's going wrong in that issue.
In the reported example, I could see server A sending federation traffic
to server B and all was well. Yet B reports out-of-sync device updates
from A.

I couldn't see what was _in_ the events being sent from A to B. So I
have added some crude logging to track

- when we have updates to send to a remote HS
- the edus we actually accumulate to send
- when a federation transaction includes a device list update edu
- when such an EDU is received

This is a bit of a sledgehammer.

Is this a good idea?
  • Loading branch information
David Robertson committed Jan 17, 2022
1 parent d8be992 commit 662dccd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
12 changes: 12 additions & 0 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import synapse.server

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")

last_pdu_ts_metric = Gauge(
"synapse_federation_last_sent_pdu_time",
Expand Down Expand Up @@ -124,6 +125,17 @@ async def send_new_transaction(
len(pdus),
len(edus),
)
if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
device_list_updates = [
edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
"transaction [%s] includes device list updates: %s",
transaction.transaction_id,
device_list_updates,
)

# Actually send the transaction

Expand Down
15 changes: 15 additions & 0 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")


class BaseFederationServerServlet(BaseFederationServlet):
Expand Down Expand Up @@ -95,6 +96,20 @@ async def on_PUT(
len(transaction_data.get("edus", [])),
)

if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
device_list_updates = [
edu.content
for edu in transaction_data.get("edus", [])
if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
"transaction [%s] includes device list updates: %s",
transaction_id,
device_list_updates,
)

except Exception as e:
logger.exception(e)
return 400, {"error": "Invalid transaction"}
Expand Down
21 changes: 19 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.server import HomeServer

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")

DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
"drop_device_list_streams_non_unique_indexes"
Expand Down Expand Up @@ -222,13 +223,18 @@ async def get_device_updates_by_remote(
limit,
)

# We need to ensure `updates` doesn't grow too big.
# Currently: `len(updates) <= limit`.
# Note for later that `len(updates) <= limit`.

# Return an empty list if there are no updates
if not updates:
return now_stream_id, []

if issue_8631_logger.isEnabledFor(logging.DEBUG):
data = {(user, device): stream_id for user, device, stream_id, _ in updates}
issue_8631_logger.debug(
"device updates need to be sent to %s: %s", destination, data
)

# get the cross-signing keys of the users in the list, so that we can
# determine which of the device changes were cross-signing keys
users = {r[0] for r in updates}
Expand Down Expand Up @@ -365,6 +371,17 @@ async def get_device_updates_by_remote(
# and remove the length budgeting above.
results.append(("org.matrix.signing_key_update", result))

if issue_8631_logger.isEnabledFor(logging.DEBUG):
for (user_id, edu) in results:
issue_8631_logger.debug(
"device update to %s for %s from %s to %s: %s",
destination,
user_id,
from_stream_id,
last_processed_stream_id,
edu,
)

return last_processed_stream_id, results

def _get_device_updates_by_remote_txn(
Expand Down

0 comments on commit 662dccd

Please sign in to comment.