Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fixup pusher pool notifications #8287

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8287.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix edge case where push could get delayed for a user until a later event was pushed.
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ async def _notify_persisted_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
await self.pusher_pool.on_new_notifications(max_stream_id)

async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ def is_inviter_member_event(e):
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
await self.pusher_pool.on_new_notifications(max_stream_id)

def _notify():
try:
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def on_stop(self):
pass
self.timed_call = None

def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
def on_new_notifications(self, max_stream_ordering):
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def on_started(self, should_check_for_notifs):
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
def on_new_notifications(self, max_stream_ordering):
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
Expand Down
19 changes: 16 additions & 3 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def __init__(self, hs: "HomeServer"):
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()

# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
# startup as every individual pusher will have checked for changes on
# startup.
self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()

# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]

Expand Down Expand Up @@ -178,20 +184,27 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, min_stream_id, max_stream_id):
async def on_new_notifications(self, max_stream_id):
if not self.pushers:
# nothing to do here.
return

if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return

prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id

try:
users_affected = await self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
prev_stream_id, max_stream_id
)

for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
p.on_new_notifications(min_stream_id, max_stream_id)
p.on_new_notifications(max_stream_id)

except Exception:
logger.exception("Exception in pusher on_new_notifications")
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ async def on_rdata(
max_token = self.store.get_room_max_stream_ordering()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is calculating the same thing it looks like, can we avoid doing it twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that line is a) cheap and b) about to die in my next PR :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I keep thinking that this causes database access. 🤦

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not an unreasonable assumption given its in self.store....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I think it looks fine!

self.notifier.on_new_room_event(event, token, max_token, extra_users)

await self.pusher_pool.on_new_notifications(token, token)
max_token = self.store.get_room_max_stream_ordering()
await self.pusher_pool.on_new_notifications(max_token)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def make_homeserver(self, reactor, clock):
"get_user_directory_stream_pos",
"get_current_state_deltas",
"get_device_updates_by_remote",
"get_room_max_stream_ordering",
]
)

Expand Down