Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix censoring redactions performance #6141

Merged
merged 4 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6141.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bad performance of censoring redactions background task.
47 changes: 29 additions & 18 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,18 @@ def event_dict(event):
],
)

for event, _ in events_and_contexts:
if not event.internal_metadata.is_redacted():
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
self._simple_update_txn(
txn,
table="redactions",
keyvalues={"redacts": event.event_id},
updatevalues={"have_censored": False},
)

def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
rejected
Expand Down Expand Up @@ -1552,9 +1564,15 @@ def prefill():
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
txn.execute(
"INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
(event.event_id, event.redacts),

self._simple_insert_txn(
txn,
table="redactions",
values={
"event_id": event.event_id,
"redacts": event.redacts,
"received_ts": self._clock.time_msec(),
},
)

@defer.inlineCallbacks
Expand All @@ -1571,36 +1589,29 @@ def _censor_redactions(self):
if self.hs.config.redaction_retention_period is None:
return

max_pos = yield self.find_first_stream_ordering_after_ts(
self._clock.time_msec() - self.hs.config.redaction_retention_period
)
before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period

# We fetch all redactions that:
# 1. point to an event we have,
# 2. has a stream ordering from before the cut off, and
# 2. has a received_ts from before the cut off, and
# 3. we haven't yet censored.
#
# This is limited to 100 events to ensure that we don't try and do too
# much at once. We'll get called again so this should eventually catch
# up.
#
# We use the range [-max_pos, max_pos] to handle backfilled events,
# which are given negative stream ordering.
sql = """
SELECT redact_event.event_id, redacts FROM redactions
INNER JOIN events AS redact_event USING (event_id)
INNER JOIN events AS original_event ON (
redact_event.room_id = original_event.room_id
AND redacts = original_event.event_id
SELECT redactions.event_id, redacts FROM redactions
LEFT JOIN events AS original_event ON (
redacts = original_event.event_id
)
WHERE NOT have_censored
AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
ORDER BY redact_event.stream_ordering ASC
AND redactions.received_ts <= ?
ORDER BY redactions.received_ts ASC
LIMIT ?
"""

rows = yield self._execute(
"_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
"_censor_redactions_fetch", None, sql, before_ts, 100
)

updates = []
Expand Down
61 changes: 61 additions & 0 deletions synapse/storage/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def __init__(self, db_conn, hs):
self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
)

self.register_background_update_handler(
"redactions_received_ts", self._redactions_received_ts
)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
Expand Down Expand Up @@ -397,3 +401,60 @@ def _drop_table_txn(txn):
)

return num_handled

@defer.inlineCallbacks
def _redactions_received_ts(self, progress, batch_size):
"""Handles filling out the `received_ts` column in redactions.
"""
last_event_id = progress.get("last_event_id", "")

def _redactions_received_ts_txn(txn):
# Fetch the set of event IDs that we want to update
sql = """
SELECT event_id FROM redactions
WHERE event_id > ?
ORDER BY event_id ASC
LIMIT ?
Comment on lines +414 to +417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to also exclude the redactions for which received_ts is not NULL, so that you don't process redactions that we received between two calls to this function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, could do! I don't think it makes much of a difference (it'll do the right thing anyway), so I'm probably just going to leave it as is.

"""

txn.execute(sql, (last_event_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

upper_event_id, = rows[-1]

# Update the redactions with the received_ts.
#
# Note: Not all events have an associated received_ts, so we
# fallback to using origin_server_ts. If we for some reason don't
# have an origin_server_ts, lets just use the current timestamp.
#
# We don't want to leave it null, as then we'll never try and
# censor those redactions.
sql = """
UPDATE redactions
SET received_ts = (
SELECT COALESCE(received_ts, origin_server_ts, ?) FROM events
WHERE events.event_id = redactions.event_id
)
WHERE ? <= event_id AND event_id <= ?
"""

txn.execute(sql, (self._clock.time_msec(), last_event_id, upper_event_id))

self._background_update_progress_txn(
txn, "redactions_received_ts", {"last_event_id": upper_event_id}
)

return len(rows)

count = yield self.runInteraction(
"_redactions_received_ts", _redactions_received_ts_txn
)

if not count:
yield self._end_background_update("redactions_received_ts")

return count
20 changes: 20 additions & 0 deletions synapse/storage/schema/delta/56/redaction_censor2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE redactions ADD COLUMN received_ts BIGINT;
CREATE INDEX redactions_have_censored_ts ON redactions(received_ts) WHERE not have_censored;

INSERT INTO background_updates (update_name, progress_json) VALUES
('redactions_received_ts', '{}');