Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle pusher being deleted during processing. #5809

Merged
merged 4 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5809.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle pusher being deleted during processing rather than logging an exception.
18 changes: 12 additions & 6 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,19 @@ def save_last_stream_ordering_and_success(self, last_stream_ordering):
return

self.last_stream_ordering = last_stream_ordering
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
last_stream_ordering,
self.clock.time_msec(),
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
last_stream_ordering,
self.clock.time_msec(),
)
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this done when the pusher is deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I mostly put this in for paranoia to make sure we don't go round the loop again.


def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000
Expand Down
27 changes: 20 additions & 7 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,21 @@ def _unsafe_process(self):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
self.clock.time_msec(),
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
self.clock.time_msec(),
)
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return

if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
Expand Down Expand Up @@ -234,12 +242,17 @@ def _unsafe_process(self):
)
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
yield self.store.update_pusher_last_stream_ordering(
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return

self.failing_since = None
yield self.store.update_pusher_failing_since(
Expand Down
30 changes: 22 additions & 8 deletions synapse/storage/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,36 @@ def update_pusher_last_stream_ordering(
def update_pusher_last_stream_ordering_and_success(
self, app_id, pushkey, user_id, last_stream_ordering, last_success
):
yield self._simple_update_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{
"""Update the last stream ordering position we've processed up to for
the given pusher.

Args:
app_id (str)
pushkey (str)
last_stream_ordering (int)
last_success (int)

Returns:
Deferred[bool]: True if the pusher still exists; False if it has been deleted.
"""
updated = yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={
"last_stream_ordering": last_stream_ordering,
"last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)

return bool(updated)

@defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{"failing_since": failing_since},
yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={"failing_since": failing_since},
desc="update_pusher_failing_since",
)

Expand Down