Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Feature: Purge local room history. #911

Merged
merged 9 commits into from
Jul 7, 2016
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ def do_auth(self, origin, event, context, auth_events):
local_view = dict(auth_events)
remote_view = dict(auth_events)
remote_view.update({
(d.type, d.state_key): d for d in different_events
(d.type, d.state_key): d for d in different_events if d
})

new_state, prev_state = self.state_handler.resolve_events(
Expand Down
77 changes: 46 additions & 31 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
)
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute, run_on_reactor
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn
from synapse.visibility import filter_events_for_client
Expand All @@ -50,6 +50,20 @@ def __init__(self, hs):
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()

self.pagination_lock = ReadWriteLock()

@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
event = yield self.store.get_event(event_id)

if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

depth = event.depth

with (yield self.pagination_lock.write(room_id)):
yield self.store.delete_old_state(room_id, depth)

@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True):
Expand Down Expand Up @@ -85,42 +99,43 @@ def get_messages(self, requester, room_id=None, pagin_config=None,

source_config = pagin_config.get_source_config("room")

membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
with (yield self.pagination_lock.read(room_id)):
membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)

if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
room_id, room_token.stream
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)

if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)

if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)

yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
events, next_key = yield data_source.get_pagination_rows(
requester.user, source_config, room_id
)

events, next_key = yield data_source.get_pagination_rows(
requester.user, source_config, room_id
)

next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)

if not events:
defer.returnValue({
Expand Down
19 changes: 19 additions & 0 deletions synapse/rest/client/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ def on_POST(self, request):
defer.returnValue((200, ret))


class PurgeHistoryRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns(
"/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
)

@defer.inlineCallbacks
def on_POST(self, request, room_id, event_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)

if not is_admin:
raise AuthError(403, "You are not a server admin")

yield self.handlers.message_handler.purge_history(room_id, event_id)

defer.returnValue((200, {}))


class DeactivateAccountRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/deactivate/(?P<target_user_id>[^/]*)")

Expand Down Expand Up @@ -106,3 +124,4 @@ def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
145 changes: 145 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,151 @@ def get_all_new_events_txn(txn):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)

def delete_old_state(self, room_id, topological_ordering):
return self.runInteraction(
"delete_old_state",
self._delete_old_state_txn, room_id, topological_ordering
)

def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"""Deletes old room state
"""

# Tables that should be pruned:
# event_auth
# event_backward_extremities
# event_content_hashes
# event_destinations
# event_edge_hashes
# event_edges
# event_forward_extremities
# event_json
# event_push_actions
# event_reference_hashes
# event_search
# event_signatures
# event_to_state_groups
# events
# rejections
# room_depth
# state_groups
# state_groups_state

# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f "
"ON e.event_id = f.event_id "
"AND e.room_id = f.room_id "
"WHERE f.room_id = ?",
(room_id,)
)
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)

if max_depth <= topological_ordering:
raise Exception("topological_ordering is greater than forward extremeties")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 400 here rather than 500?
Maybe comment on why this is bad.


txn.execute(
"SELECT event_id, state_key FROM events"
" LEFT JOIN state_events USING (room_id, event_id)"
" WHERE room_id = ? AND topological_ordering < ?",
(room_id, topological_ordering,)
)
event_rows = txn.fetchall()

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
"SELECT e.event_id FROM events as e"
" INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events as e2 ON e2.event_id = ed.event_id"
" WHERE e.room_id = ? AND e.topological_ordering < ?"
" AND e2.topological_ordering >= ?",
(room_id, topological_ordering, topological_ordering)
)
new_backwards_extrems = txn.fetchall()

# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
" SELECT DISTINCT state_group FROM events"
" INNER JOIN event_to_state_groups USING (event_id)"
" WHERE room_id = ? AND topological_ordering < ?"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(room_id, topological_ordering, topological_ordering)
)
state_rows = txn.fetchall()
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
state_rows
)
# Delete all non-state
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)

txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
)

# Delete all remote non-state events
to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and not self.hs.is_mine_id(event_id)
]
for table in (
"events",
"event_json",
"event_auth",
"event_content_hashes",
"event_destinations",
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
"rejections",
"event_backward_extremities",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
)

# Update backward extremeties
txn.executemany(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
[(room_id, event_id) for event_id, in new_backwards_extrems]
)

txn.executemany(
"DELETE FROM events WHERE event_id = ?",
to_delete
)
# Mark all state and own events as outliers
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
[
(True, event_id,) for event_id, state_key in event_rows
if state_key is not None or self.hs.is_mine_id(event_id)
]
)


AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,13 @@ def get_topological_token_for_event(self, event_id):
row["topological_ordering"], row["stream_ordering"],)
)

def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
def get_max_topological_token(self, room_id, stream_key):
sql = (
"SELECT max(topological_ordering) FROM events"
" WHERE room_id = ? AND stream_ordering < ?"
)
return self._execute(
"get_max_topological_token_for_stream_and_room", None,
"get_max_topological_token", None,
sql, room_id, stream_key,
).addCallback(
lambda r: r[0][0] if r else 0
Expand Down
82 changes: 82 additions & 0 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,85 @@ def _ctx_manager():
self.key_to_defer.pop(key, None)

defer.returnValue(_ctx_manager())


class ReadWriteLock(object):
"""A deferred style read write lock.

Example:

with (yield read_write_lock.read("test_key")):
# do some work
"""

# IMPLEMENTATION NOTES
#
# We track the most recent queued reader and writer deferreds (which get
# resolved when they release the lock).
#
# Read: We know its safe to acquire a read lock when the latest writer has
# been resolved. The new reader is appeneded to the list of latest readers.
#
# Write: We know its safe to acquire the write lock when both the latest
# writers and readers have been resolved. The new writer replaces the latest
# writer.

def __init__(self):
# Latest readers queued
self.key_to_current_readers = {}

# Latest writer queued
self.key_to_current_writer = {}

@defer.inlineCallbacks
def read(self, key):
new_defer = defer.Deferred()

curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)

curr_readers.add(new_defer)

# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
yield curr_writer

@contextmanager
def _ctx_manager():
try:
yield
finally:
new_defer.callback(None)
self.key_to_current_readers.get(key, set()).discard(new_defer)

defer.returnValue(_ctx_manager())

@defer.inlineCallbacks
def write(self, key):
new_defer = defer.Deferred()

curr_readers = self.key_to_current_readers.get(key, set())
curr_writer = self.key_to_current_writer.get(key, None)

# We wait on all latest readers and writer.
to_wait_on = list(curr_readers)
if curr_writer:
to_wait_on.append(curr_writer)

# We can clear the list of current readers since the new writer waits
# for them to finish.
curr_readers.clear()
self.key_to_current_writer[key] = new_defer

yield defer.gatherResults(to_wait_on)

@contextmanager
def _ctx_manager():
try:
yield
finally:
new_defer.callback(None)
if self.key_to_current_writer[key] == new_defer:
self.key_to_current_writer.pop(key)

defer.returnValue(_ctx_manager())
Loading