Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'babolivier/port_db_background_updates' into babolivier/…
Browse files Browse the repository at this point in the history
…port_db_ci
  • Loading branch information
babolivier committed Oct 7, 2019
2 parents 9fb4288 + 1eb0600 commit 0b68a54
Show file tree
Hide file tree
Showing 26 changed files with 956 additions and 680 deletions.
1 change: 1 addition & 0 deletions changelog.d/1172.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update `user_filters` table to have a unique index, and non-null columns. Thanks to @pik for contributing this.
1 change: 1 addition & 0 deletions changelog.d/2142.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve quality of thumbnails for 1-bit/8-bit color palette images.
1 change: 1 addition & 0 deletions changelog.d/6147.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Don't 500 when trying to exchange a revoked 3PID invite.
1 change: 1 addition & 0 deletions changelog.d/6159.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add more caching to `_get_joined_users_from_context` DB query.
1 change: 1 addition & 0 deletions changelog.d/6160.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some metrics on the federation sender.
1 change: 1 addition & 0 deletions changelog.d/6167.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some logging to the rooms stats updates, to try to track down a flaky test.
1 change: 1 addition & 0 deletions changelog.d/6175.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update `user_filters` table to have a unique index, and non-null columns. Thanks to @pik for contributing this
1 change: 1 addition & 0 deletions changelog.d/6178.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.
11 changes: 6 additions & 5 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import measure_func
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -183,8 +183,8 @@ def handle_event(event):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=event.prev_event_ids()
destinations = yield self.state.get_hosts_in_room_at_events(
event.room_id, event_ids=event.prev_event_ids()
)
except Exception:
logger.exception(
Expand All @@ -207,8 +207,9 @@ def handle_event(event):

@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
with Measure(self.clock, "handle_room_events"):
for event in events:
yield handle_event(event)

events_by_room = {}
for event in events:
Expand Down
9 changes: 7 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2570,7 +2570,7 @@ def on_exchange_third_party_invite_request(self, room_id, event_dict):
)

try:
self.auth.check_from_context(room_version, event, context)
yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying third party invite %r because %s", event, e)
raise e
Expand Down Expand Up @@ -2599,7 +2599,12 @@ def add_display_name_to_third_party_invite(
original_invite_id, allow_none=True
)
if original_invite:
display_name = original_invite.content["display_name"]
# If the m.room.third_party_invite event's content is empty, it means the
# invite has been revoked. In this case, we don't have to raise an error here
# because the auth check will fail on the invite (because it's not able to
# fetch public keys from the m.room.third_party_invite event's content, which
# is empty).
display_name = original_invite.content.get("display_name")
event_dict["content"]["third_party_invite"]["display_name"] = display_name
else:
logger.info(
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def _handle_deltas(self, deltas):
room_state["guest_access"] = event_content.get("guest_access")

for room_id, state in room_to_state_updates.items():
logger.info("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)

return room_to_stats_deltas, user_to_stats_deltas
14 changes: 11 additions & 3 deletions synapse/rest/media/v1/thumbnailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,21 @@ def aspect(self, max_width, max_height):
else:
return (max_height * self.width) // self.height, max_height

def _resize(self, width, height):
# 1-bit or 8-bit color palette images need converting to RGB
# otherwise they will be scaled using nearest neighbour which
# looks awful
if self.image.mode in ["1", "P"]:
self.image = self.image.convert("RGB")
return self.image.resize((width, height), Image.ANTIALIAS)

def scale(self, width, height, output_type):
"""Rescales the image to the given dimensions.
Returns:
BytesIO: the bytes of the encoded image ready to be written to disk
"""
scaled = self.image.resize((width, height), Image.ANTIALIAS)
scaled = self._resize(width, height)
return self._encode_image(scaled, output_type)

def crop(self, width, height, output_type):
Expand All @@ -107,13 +115,13 @@ def crop(self, width, height, output_type):
"""
if width * self.height > height * self.width:
scaled_height = (width * self.height) // self.width
scaled_image = self.image.resize((width, scaled_height), Image.ANTIALIAS)
scaled_image = self._resize(width, scaled_height)
crop_top = (scaled_height - height) // 2
crop_bottom = height + crop_top
cropped = scaled_image.crop((0, crop_top, width, crop_bottom))
else:
scaled_width = (height * self.width) // self.height
scaled_image = self.image.resize((scaled_width, height), Image.ANTIALIAS)
scaled_image = self._resize(scaled_width, height)
crop_left = (scaled_width - width) // 2
crop_right = width + crop_left
cropped = scaled_image.crop((crop_left, 0, crop_right, height))
Expand Down
24 changes: 18 additions & 6 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -191,11 +191,22 @@ def get_current_users_in_room(self, room_id, latest_event_ids=None):
return joined_users

@defer.inlineCallbacks
def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
if not latest_event_ids:
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
def get_current_hosts_in_room(self, room_id):
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
return (yield self.get_hosts_in_room_at_events(room_id, event_ids))

@defer.inlineCallbacks
def get_hosts_in_room_at_events(self, room_id, event_ids):
"""Get the hosts that were in a room at the given event ids
Args:
room_id (str):
event_ids (list[str]):
Returns:
Deferred[list[str]]: the hosts in the room at the given events
"""
entry = yield self.resolve_state_groups_for_events(room_id, event_ids)
joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
return joined_hosts

Expand Down Expand Up @@ -344,6 +355,7 @@ def compute_event_context(self, event, old_state=None):

return context

@measure_func()
@defer.inlineCallbacks
def resolve_state_groups_for_events(self, room_id, event_ids):
""" Given a list of event_ids this method fetches the state at each
Expand Down
200 changes: 106 additions & 94 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
LAST_SEEN_GRANULARITY = 120 * 1000


class ClientIpStore(background_updates.BackgroundUpdateStore):
class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
def __init__(self, db_conn, hs):

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
)

super(ClientIpStore, self).__init__(db_conn, hs)
super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

Expand Down Expand Up @@ -92,19 +92,6 @@ def __init__(self, db_conn, hs):
"devices_last_seen", self._devices_last_seen_update
)

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
Expand Down Expand Up @@ -303,6 +290,110 @@ def remove(txn):

return batch_size

@defer.inlineCallbacks
def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""

last_user_id = progress.get("last_user_id", "")
last_device_id = progress.get("last_device_id", "")

def _devices_last_seen_update_txn(txn):
# This consists of two queries:
#
# 1. The sub-query searches for the next N devices and joins
# against user_ips to find the max last_seen associated with
# that device.
# 2. The outer query then joins again against user_ips on
# user/device/last_seen. This *should* hopefully only
# return one row, but if it does return more than one then
# we'll just end up updating the same device row multiple
# times, which is fine.

if self.database_engine.supports_tuple_comparison:
where_clause = "(user_id, device_id) > (?, ?)"
where_args = [last_user_id, last_device_id]
else:
# We explicitly do a `user_id >= ? AND (...)` here to ensure
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
# makes it hard for query optimiser to tell that it can use the
# index on user_id
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
where_args = [last_user_id, last_user_id, last_device_id]

sql = """
SELECT
last_seen, ip, user_agent, user_id, device_id
FROM (
SELECT
user_id, device_id, MAX(u.last_seen) AS last_seen
FROM devices
INNER JOIN user_ips AS u USING (user_id, device_id)
WHERE %(where_clause)s
GROUP BY user_id, device_id
ORDER BY user_id ASC, device_id ASC
LIMIT ?
) c
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
""" % {
"where_clause": where_clause
}
txn.execute(sql, where_args + [batch_size])

rows = txn.fetchall()
if not rows:
return 0

sql = """
UPDATE devices
SET last_seen = ?, ip = ?, user_agent = ?
WHERE user_id = ? AND device_id = ?
"""
txn.execute_batch(sql, rows)

_, _, _, user_id, device_id = rows[-1]
self._background_update_progress_txn(
txn,
"devices_last_seen",
{"last_user_id": user_id, "last_device_id": device_id},
)

return len(rows)

updated = yield self.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)

if not updated:
yield self._end_background_update("devices_last_seen")

return updated


class ClientIpStore(ClientIpBackgroundUpdateStore):
def __init__(self, db_conn, hs):

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
)

super(ClientIpStore, self).__init__(db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@defer.inlineCallbacks
def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
Expand Down Expand Up @@ -454,85 +545,6 @@ def get_user_ip_and_agents(self, user):
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
)

@defer.inlineCallbacks
def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""

last_user_id = progress.get("last_user_id", "")
last_device_id = progress.get("last_device_id", "")

def _devices_last_seen_update_txn(txn):
# This consists of two queries:
#
# 1. The sub-query searches for the next N devices and joins
# against user_ips to find the max last_seen associated with
# that device.
# 2. The outer query then joins again against user_ips on
# user/device/last_seen. This *should* hopefully only
# return one row, but if it does return more than one then
# we'll just end up updating the same device row multiple
# times, which is fine.

if self.database_engine.supports_tuple_comparison:
where_clause = "(user_id, device_id) > (?, ?)"
where_args = [last_user_id, last_device_id]
else:
# We explicitly do a `user_id >= ? AND (...)` here to ensure
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
# makes it hard for query optimiser to tell that it can use the
# index on user_id
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
where_args = [last_user_id, last_user_id, last_device_id]

sql = """
SELECT
last_seen, ip, user_agent, user_id, device_id
FROM (
SELECT
user_id, device_id, MAX(u.last_seen) AS last_seen
FROM devices
INNER JOIN user_ips AS u USING (user_id, device_id)
WHERE %(where_clause)s
GROUP BY user_id, device_id
ORDER BY user_id ASC, device_id ASC
LIMIT ?
) c
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
""" % {
"where_clause": where_clause
}
txn.execute(sql, where_args + [batch_size])

rows = txn.fetchall()
if not rows:
return 0

sql = """
UPDATE devices
SET last_seen = ?, ip = ?, user_agent = ?
WHERE user_id = ? AND device_id = ?
"""
txn.execute_batch(sql, rows)

_, _, _, user_id, device_id = rows[-1]
self._background_update_progress_txn(
txn,
"devices_last_seen",
{"last_user_id": user_id, "last_device_id": device_id},
)

return len(rows)

updated = yield self.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)

if not updated:
yield self._end_background_update("devices_last_seen")

return updated

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
Expand Down
Loading

0 comments on commit 0b68a54

Please sign in to comment.