Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add DB bg update to cleanup extremities. #5278

Merged
merged 8 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5278.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
2 changes: 2 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events import EventsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
Expand Down Expand Up @@ -66,6 +67,7 @@


class DataStore(
EventsBackgroundUpdatesStore,
RoomMemberStore,
RoomStore,
RegistrationStore,
Expand Down
12 changes: 9 additions & 3 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,8 @@ def _simple_delete_txn(txn, table, keyvalues):
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)

return txn.execute(sql, list(keyvalues.values()))
txn.execute(sql, list(keyvalues.values()))
return txn.rowcount

def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
return self.runInteraction(
Expand All @@ -1280,9 +1281,12 @@ def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
column : column name to test for inclusion against `iterable`
iterable : list
keyvalues : dict of column names and values to select the rows with

Returns:
int: Number rows deleted
"""
if not iterable:
return
return 0

sql = "DELETE FROM %s" % table

Expand All @@ -1297,7 +1301,9 @@ def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):

if clauses:
sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
return txn.execute(sql, values)
txn.execute(sql, values)

return txn.rowcount

def _get_cache_dict(
self, db_conn, table, entity_column, stream_column, max_value, limit=100000
Expand Down
180 changes: 2 additions & 178 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,41 +220,11 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"

def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
self.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender,
)

self.register_background_index_update(
"event_contains_url_index",
index_name="event_contains_url_index",
table="events",
columns=["room_id", "topological_ordering", "stream_ordering"],
where_clause="contains_url = true AND outlier = false",
)

# an event_id index on event_search is useful for the purge_history
# api. Plus it means we get to enforce some integrity with a UNIQUE
# clause
self.register_background_index_update(
"event_search_event_id_idx",
index_name="event_search_event_id_idx",
table="event_search",
columns=["event_id"],
unique=True,
psql_only=True,
)

self._event_persist_queue = _EventPeristenceQueue()

self._state_resolution_handler = hs.get_state_resolution_handler()

@defer.inlineCallbacks
Expand Down Expand Up @@ -1579,153 +1550,6 @@ def _count(txn):
ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
" INNER JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

min_stream_id = rows[-1][0]

update_rows = []
for row in rows:
try:
event_id = row[1]
event_json = json.loads(row[2])
sender = event_json["sender"]
content = event_json["content"]

contains_url = "url" in content
if contains_url:
contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue

update_rows.append((sender, contains_url, event_id))

sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"

for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows),
}

self._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)

return len(rows)

result = yield self.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)

if not result:
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)

defer.returnValue(result)

@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]

rows_to_update = []

chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)

for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue

rows_to_update.append((origin_server_ts, event_id))

sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"

for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows_to_update),
}

self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)

return len(rows_to_update)

result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)

if not result:
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)

defer.returnValue(result)

def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
Expand Down
Loading