Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ephemeral messages support (MSC2228) #6409

Merged
merged 49 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c982380
Add database support for ephemeral messages
babolivier Nov 25, 2019
6ad5e92
Add helper functions to redact messages on expiry
babolivier Nov 25, 2019
2628e19
Plug redaction scheduling in the right places
babolivier Nov 25, 2019
8255b9e
Changelog
babolivier Nov 25, 2019
8563e2b
Lint
babolivier Nov 25, 2019
f2c6018
Actually delete events on expiry
babolivier Nov 26, 2019
01d1876
Rename functions + doc
babolivier Nov 26, 2019
274612a
Lint
babolivier Nov 26, 2019
d424549
Move the field name to a constant
babolivier Nov 26, 2019
052294e
Hide the feature behind a configuration flag
babolivier Nov 26, 2019
dee234f
Don't expire state events
babolivier Nov 26, 2019
344d287
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Nov 26, 2019
36230a5
Test case and various fixes
babolivier Nov 27, 2019
89765f4
Lint
babolivier Nov 27, 2019
ceab0a8
Add background update
babolivier Nov 27, 2019
26dec61
Lint
babolivier Nov 27, 2019
2ac7869
Update synapse/api/constants.py
babolivier Nov 27, 2019
a4307c6
Incorporate part of the review
babolivier Nov 27, 2019
a6461c0
Change the scheduling flow
babolivier Nov 27, 2019
c33d2b4
Use the same code path for censoring redactions and expired events
babolivier Nov 28, 2019
b766861
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Nov 28, 2019
9a531be
Lint and docstring
babolivier Nov 28, 2019
964c6da
Lint
babolivier Nov 28, 2019
370d089
Various fixes
babolivier Nov 28, 2019
385f47b
Move the database functions to the worker store
babolivier Nov 28, 2019
e092144
Lint
babolivier Nov 28, 2019
1d1f9f2
Lint again
babolivier Nov 28, 2019
6ca89ba
Revert "Move the database functions to the worker store"
babolivier Nov 29, 2019
0e57c5c
Make the expiry happen on the master process
babolivier Nov 29, 2019
0767898
Incorporate review
babolivier Dec 2, 2019
9e61740
Lint
babolivier Dec 2, 2019
d719bfe
Fix type hint
babolivier Dec 2, 2019
857fbd7
fetchone returns a tuple, not a dict
babolivier Dec 2, 2019
8cd5a67
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Dec 2, 2019
d9d2483
Update synapse/handlers/message.py
babolivier Dec 3, 2019
b3ae6cb
Incorporate review
babolivier Dec 3, 2019
bc00c73
Lint
babolivier Dec 3, 2019
a18f1f7
Lint
babolivier Dec 3, 2019
5ead39d
Incorporate review
babolivier Dec 3, 2019
442a192
Fix maybe logic
babolivier Dec 3, 2019
a0ec11a
Prevent two tasks from being scheduled at the same time
babolivier Dec 3, 2019
9583c60
Typo
babolivier Dec 3, 2019
d5640c3
Typos
babolivier Dec 3, 2019
1573a5e
Incorporate review
babolivier Dec 3, 2019
44741a0
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Dec 3, 2019
8d2ee04
Add flakey tests to the black list
babolivier Dec 3, 2019
c1102d6
Moar flakey tests
babolivier Dec 3, 2019
3e007c0
Revert "Moar flakey tests"
babolivier Dec 3, 2019
7648299
Revert "Add flakey tests to the black list"
babolivier Dec 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6409.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228).
3 changes: 3 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,6 @@ class EventContentFields(object):

# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS = "org.matrix.labels"
# Timestamp to delete the event after
babolivier marked this conversation as resolved.
Show resolved Hide resolved
# cf https://github.com/matrix-org/matrix-doc/pull/2228
SELF_DESTRUCT_AFTER = "m.self_destruct_after"
babolivier marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ class LimitRemoteRoomsConfig(object):
"cleanup_extremities_with_dummy_events", True
)

self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False)

def has_tls_listener(self) -> bool:
return any(l["tls"] for l in self.listeners)

Expand Down
16 changes: 15 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from twisted.internet import defer

from synapse import event_auth
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RejectedReason,
)
from synapse.api.errors import (
AuthError,
CodeMessageException,
Expand Down Expand Up @@ -141,6 +146,8 @@ def __init__(self, hs):

self.third_party_event_rules = hs.get_third_party_event_rules()

self._message_handler = hs.get_message_handler()
babolivier marked this conversation as resolved.
Show resolved Hide resolved

@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
""" Process a PDU received via a federation /send/ transaction, or
Expand Down Expand Up @@ -1707,6 +1714,13 @@ def _handle_new_event(
self.store.remove_push_actions_from_staging, event.event_id
)

# If there's an expiry timestamp, schedule the redaction of the event.
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
babolivier marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(expiry_ts, int) and not event.is_state():
yield self._message_handler.schedule_deletion_expired(
event.event_id, expiry_ts
)

return context

@defer.inlineCallbacks
Expand Down
93 changes: 92 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
from twisted.internet.defer import succeed

from synapse import event_auth
from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RelationTypes,
UserTypes,
)
from synapse.api.errors import (
AuthError,
Codes,
Expand Down Expand Up @@ -62,6 +68,12 @@ def __init__(self, hs):
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages

run_as_background_process(
"_schedule_deletions_expired_from_db",
self._schedule_deletions_expired_from_db,
)

@defer.inlineCallbacks
def get_room_data(
Expand Down Expand Up @@ -225,6 +237,76 @@ def get_joined_members(self, requester, room_id):
for user_id, profile in iteritems(users_with_profile)
}

@defer.inlineCallbacks
def schedule_deletion_expired(self, event_id, redaction_ts):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Schedule the deletion of an expired event, or if that event should have
expired then delete the event immediately.

Args:
event_id (str): The ID of the event to schedule the deletion of.
redaction_ts (int): The timestamp to delete the event at.
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""
if not self._ephemeral_events_enabled:
return
babolivier marked this conversation as resolved.
Show resolved Hide resolved

# Save the timestamp at which the event expires so that we can reschedule its
# deletion on startup if the server is stopped before the event is deleted.
yield self.store.insert_event_expiry(event_id, redaction_ts)

now_ms = self.clock.time_msec()
delay = (redaction_ts - now_ms) / 1000

if delay > 0:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
# Figure out how many seconds we need to wait before redacting the event.
babolivier marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Scheduling deletion of event %s in %.3fs", event_id, delay)
self.clock.call_later(delay, self._delete_expired_event, event_id)
babolivier marked this conversation as resolved.
Show resolved Hide resolved
else:
# If the event should have already been redacted, redact it now.
yield self._delete_expired_event(event_id)

@defer.inlineCallbacks
def _schedule_deletions_expired_from_db(self):
"""Load the IDs of the events that have an expiry date from the database (and
their expiry timestamp) and either delete them (if the expiry date is now or in
the past) or schedule their deletion on the timestamp.
"""
if not self._ephemeral_events_enabled:
return

events_to_expire = yield self.store.get_events_to_expire()

for event in events_to_expire:
yield self.schedule_deletion_expired(event["event_id"], event["expiry_ts"])
babolivier marked this conversation as resolved.
Show resolved Hide resolved

@defer.inlineCallbacks
def _delete_expired_event(self, event_id):
"""Retrieve and delete an expired event from the database.

If we don't have the event in the database, log it and delete the expiry date
from the database (so that we don't try to delete it again).

Args:
event_id (str): The ID of the event to retrieve and delete.
"""
if not self._ephemeral_events_enabled:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
return

logger.info("Deleting expired event %s", event_id)

# Try to retrieve the event from the database.
event = yield self.store.get_event(event_id)

if not event:
# If we can't find the event, log a warning and delete the expiry date from
# the database so that we don't try to delete it again in the future.
logger.warning("Can't delete event %s because we don't have it." % event_id)
yield self.store.delete_event_expiry(event)
return
babolivier marked this conversation as resolved.
Show resolved Hide resolved

# Delete the event. This function also deletes the expiry date from the database
# in the same database transaction.
yield self.store.delete_expired_event(event)


# The duration (in ms) after which rooms should be removed
# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
Expand Down Expand Up @@ -295,6 +377,8 @@ def __init__(self, hs):
5 * 60 * 1000,
)

self._message_handler = hs.get_message_handler()

@defer.inlineCallbacks
def create_event(
self,
Expand Down Expand Up @@ -736,6 +820,13 @@ def handle_new_client_event(
self.store.remove_push_actions_from_staging, event.event_id
)

# If there's an expiry timestamp, schedule the redaction of the event.
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
if isinstance(expiry_ts, int) and not event.is_state():
yield self._message_handler.schedule_deletion_expired(
event.event_id, expiry_ts
)

@defer.inlineCallbacks
def persist_and_notify_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
Expand Down
72 changes: 72 additions & 0 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,78 @@ def insert_labels_for_event_txn(
],
)

def insert_event_expiry(self, event_id, expiry_ts):
"""Save the expiry timestamp associated with a given event ID.

Args:
event_id (str): The event ID the expiry timestamp is associated with.
expiry_ts (int): The timestamp at which to expire (delete) the event.
"""
return self._simple_insert(
table="event_expiry",
values={"event_id": event_id, "expiry_ts": expiry_ts},
desc="insert_event_expiry",
)

def delete_expired_event(self, event):
"""Delete an event that has expired by replacing its entry in event_json with a
pruned version of its JSON representation, and delete its associated expiry
timestamp.

Args:
event (events.EventBase): The event to delete.
"""
# Prune the event's dict then convert it to JSON.
pruned_json = encode_json(prune_event_dict(event.get_dict()))
babolivier marked this conversation as resolved.
Show resolved Hide resolved

def delete_expired_event_txn(txn):
# Update the event_json table to replace the event's JSON with the pruned
# JSON.
self._simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event.event_id},
updatevalues={"json": pruned_json},
)

# Delete the expiry timestamp associated with this event from the database.
self._simple_delete_txn(
txn, table="event_expiry", keyvalues={"event_id": event.event_id}
)

# We need to invalidate the event cache entry for this event because we
# changed its content in the database.
self._get_event_cache.invalidate((event.event_id,))

return self.runInteraction("delete_expired_event", delete_expired_event_txn)

def delete_event_expiry(self, event_id):
"""Delete the expiry timestamp associated with an event ID without deleting the
actual event.

Args:
event_id (str): The event ID to delete the associated expiry timestamp of.
"""
return self._simple_delete(
table="event_expiry",
keyvalues={"event_id": event_id},
desc="delete_event_expiry",
)

def get_events_to_expire(self):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Retrieve the IDs of the events we have an expiry timestamp for, along with
said timestamp.

Returns:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
A list of dicts, each containing an event_id and an expiry_ts.
"""
return self._simple_select_list(
table="event_expiry",
keyvalues=None,
retcols=["event_id", "expiry_ts"],
desc="get_events_to_expire",
)


AllNewEventsResult = namedtuple(
"AllNewEventsResult",
Expand Down
64 changes: 64 additions & 0 deletions synapse/storage/data_stores/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def __init__(self, db_conn, hs):
"event_store_labels", self._event_store_labels
)

self.register_background_update_handler(
"event_store_expiry", self._event_store_expiry
)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
Expand Down Expand Up @@ -573,3 +577,63 @@ def _event_store_labels_txn(txn):
yield self._end_background_update("event_store_labels")

return num_rows

@defer.inlineCallbacks
def _event_store_expiry(self, progress, batch_size):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Background update handler which will store expiry timestamps for existing
events.
"""
last_event_id = progress.get("last_event_id", "")

def _event_store_labels_txn(txn):
txn.execute(
"""
SELECT event_id, json FROM event_json
LEFT JOIN event_expiry USING (event_id)
WHERE event_id > ? AND expiry_ts IS NULL
ORDER BY event_id LIMIT ?
""",
(last_event_id, batch_size),
)

results = list(txn)

nbrows = 0
last_row_event_id = ""
for (event_id, event_json_raw) in results:
try:
event_json = json.loads(event_json_raw)

expiry_ts = event_json.get(EventContentFields.SELF_DESTRUCT_AFTER)

if isinstance(expiry_ts, int):
self._simple_insert_txn(
txn=txn,
table="event_expiry",
values={"event_id": event_id, "expiry_ts": expiry_ts},
)
except Exception as e:
logger.warning(
"Unable to load event %s (no expiry timestamp will be imported):"
" %s",
event_id,
e,
)

nbrows += 1
last_row_event_id = event_id

self._background_update_progress_txn(
txn, "event_store_expiry", {"last_event_id": last_row_event_id}
)

return nbrows

num_rows = yield self.runInteraction(
desc="event_store_expiry", func=_event_store_labels_txn
)

if not num_rows:
yield self._end_background_update("event_store_expiry")

return num_rows
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE IF NOT EXISTS event_expiry (
event_id TEXT PRIMARY KEY,
expiry_ts BIGINT NOT NULL
babolivier marked this conversation as resolved.
Show resolved Hide resolved
);

INSERT INTO background_updates (update_name, progress_json) VALUES
('event_store_expiry', '{}');
Loading