Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

N + 3: Read from column full_user_id rather than user_id of tables profiles and user_filters #15649

Merged
merged 18 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15649.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Read from column `full_user_id` rather than `user_id` of tables `profiles` and `user_filters`.
4 changes: 2 additions & 2 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ def __init__(self, hs: "HomeServer"):
self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {})

async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
self, user_id: UserID, filter_id: Union[int, str]
) -> "FilterCollection":
result = await self.store.get_user_filter(user_localpart, filter_id)
result = await self.store.get_user_filter(user_id, filter_id)
return FilterCollection(self._hs, result)

def add_user_filter(self, user_id: UserID, user_filter: JsonDict) -> Awaitable[int]:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async def _send_renewal_email(self, user_id: str, expiration_ts: int) -> None:

try:
user_display_name = await self.store.get_profile_displayname(
UserID.from_string(user_id).localpart
UserID.from_string(user_id)
)
if user_display_name is None:
user_display_name = user_id
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def get_user(self, user: UserID) -> Optional[JsonDict]:
}

# Add additional user metadata
profile = await self._store.get_profileinfo(user.localpart)
profile = await self._store.get_profileinfo(user)
threepids = await self._store.user_get_threepids(user.to_string())
external_ids = [
({"auth_provider": auth_provider, "external_id": external_id})
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ async def complete_sso_login(
return

user_profile_data = await self.store.get_profileinfo(
UserID.from_string(registered_user_id).localpart
UserID.from_string(registered_user_id)
)

# Store any extra attributes which will be passed in the login response.
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,5 +297,5 @@ async def activate_account(self, user_id: str) -> None:
# Add the user to the directory, if necessary. Note that
# this must be done after the user is re-activated, because
# deactivated users are excluded from the user directory.
profile = await self.store.get_profileinfo(user.localpart)
profile = await self.store.get_profileinfo(user)
await self.user_directory_handler.handle_local_profile_change(user_id, profile)
26 changes: 9 additions & 17 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDi
target_user = UserID.from_string(user_id)

if self.hs.is_mine(target_user):
profileinfo = await self.store.get_profileinfo(target_user.localpart)
profileinfo = await self.store.get_profileinfo(target_user)
if profileinfo.display_name is None:
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)

Expand Down Expand Up @@ -99,9 +99,7 @@ async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDi
async def get_displayname(self, target_user: UserID) -> Optional[str]:
if self.hs.is_mine(target_user):
try:
displayname = await self.store.get_profile_displayname(
target_user.localpart
)
displayname = await self.store.get_profile_displayname(target_user)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
Expand Down Expand Up @@ -147,7 +145,7 @@ async def set_displayname(
raise AuthError(400, "Cannot set another user's displayname")

if not by_admin and not self.hs.config.registration.enable_set_displayname:
profile = await self.store.get_profileinfo(target_user.localpart)
profile = await self.store.get_profileinfo(target_user)
if profile.display_name:
raise SynapseError(
400,
Expand Down Expand Up @@ -180,7 +178,7 @@ async def set_displayname(

await self.store.set_profile_displayname(target_user, displayname_to_set)

profile = await self.store.get_profileinfo(target_user.localpart)
profile = await self.store.get_profileinfo(target_user)
await self.user_directory_handler.handle_local_profile_change(
target_user.to_string(), profile
)
Expand All @@ -194,9 +192,7 @@ async def set_displayname(
async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
if self.hs.is_mine(target_user):
try:
avatar_url = await self.store.get_profile_avatar_url(
target_user.localpart
)
avatar_url = await self.store.get_profile_avatar_url(target_user)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
Expand Down Expand Up @@ -241,7 +237,7 @@ async def set_avatar_url(
raise AuthError(400, "Cannot set another user's avatar_url")

if not by_admin and not self.hs.config.registration.enable_set_avatar_url:
profile = await self.store.get_profileinfo(target_user.localpart)
profile = await self.store.get_profileinfo(target_user)
if profile.avatar_url:
raise SynapseError(
400, "Changing avatar is disabled on this server", Codes.FORBIDDEN
Expand Down Expand Up @@ -272,7 +268,7 @@ async def set_avatar_url(

await self.store.set_profile_avatar_url(target_user, avatar_url_to_set)

profile = await self.store.get_profileinfo(target_user.localpart)
profile = await self.store.get_profileinfo(target_user)
await self.user_directory_handler.handle_local_profile_change(
target_user.to_string(), profile
)
Expand Down Expand Up @@ -369,14 +365,10 @@ async def on_profile_query(self, args: JsonDict) -> JsonDict:
response = {}
try:
if just_field is None or just_field == "displayname":
response["displayname"] = await self.store.get_profile_displayname(
user.localpart
)
response["displayname"] = await self.store.get_profile_displayname(user)

if just_field is None or just_field == "avatar_url":
response["avatar_url"] = await self.store.get_profile_avatar_url(
user.localpart
)
response["avatar_url"] = await self.store.get_profile_avatar_url(user)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async def register_user(
approved=approved,
)

profile = await self.store.get_profileinfo(localpart)
profile = await self.store.get_profileinfo(user)
await self.user_directory_handler.handle_local_profile_change(
user_id, profile
)
Expand Down
4 changes: 3 additions & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,9 @@ async def get_profile_for_user(self, localpart: str) -> ProfileInfo:
Returns:
The profile information (i.e. display name and avatar URL).
"""
return await self._store.get_profileinfo(localpart)
server_name = self._hs.hostname
user_id = UserID.from_string(f"@{localpart}:{server_name}")
return await self._store.get_profileinfo(user_id)

async def get_threepids_for_user(self, user_id: str) -> List[Dict[str, str]]:
"""Look up the threepids (email addresses and phone numbers) associated with the
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/mailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async def send_notification_mail(

try:
user_display_name = await self.store.get_profile_displayname(
UserID.from_string(user_id).localpart
UserID.from_string(user_id)
)
if user_display_name is None:
user_display_name = user_id
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def on_GET(

try:
filter_collection = await self.filtering.get_user_filter(
user_localpart=target_user.localpart, filter_id=filter_id_int
user_id=target_user, filter_id=filter_id_int
)
except StoreError as e:
if e.code != 404:
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
else:
try:
filter_collection = await self.filtering.get_user_filter(
user.localpart, filter_id
user, filter_id
)
except StoreError as err:
if err.code != 404:
Expand Down
12 changes: 6 additions & 6 deletions synapse/storage/databases/main/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:

@cached(num_args=2)
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
self, user_id: UserID, filter_id: Union[int, str]
) -> JsonDict:
# filter_id is BIGINT UNSIGNED, so if it isn't a number, fail
# with a coherent error message rather than 500 M_UNKNOWN.
Expand All @@ -156,7 +156,7 @@ async def get_user_filter(

def_json = await self.db_pool.simple_select_one_onecol(
table="user_filters",
keyvalues={"user_id": user_localpart, "filter_id": filter_id},
keyvalues={"full_user_id": user_id.to_string(), "filter_id": filter_id},
retcol="filter_json",
allow_none=False,
desc="get_user_filter",
Expand All @@ -172,15 +172,15 @@ async def add_user_filter(self, user_id: UserID, user_filter: JsonDict) -> int:
def _do_txn(txn: LoggingTransaction) -> int:
sql = (
"SELECT filter_id FROM user_filters "
"WHERE user_id = ? AND filter_json = ?"
"WHERE full_user_id = ? AND filter_json = ?"
)
txn.execute(sql, (user_id.localpart, bytearray(def_json)))
txn.execute(sql, (user_id.to_string(), bytearray(def_json)))
filter_id_response = txn.fetchone()
if filter_id_response is not None:
return filter_id_response[0]

sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_id.localpart,))
sql = "SELECT MAX(filter_id) FROM user_filters WHERE full_user_id = ?"
txn.execute(sql, (user_id.to_string(),))
max_id = cast(Tuple[Optional[int]], txn.fetchone())[0]
if max_id is None:
filter_id = 0
Expand Down
12 changes: 6 additions & 6 deletions synapse/storage/databases/main/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:

return 50

async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
async def get_profileinfo(self, user_id: UserID) -> ProfileInfo:
try:
profile = await self.db_pool.simple_select_one(
table="profiles",
keyvalues={"user_id": user_localpart},
keyvalues={"full_user_id": user_id.to_string()},
retcols=("displayname", "avatar_url"),
desc="get_profileinfo",
)
Expand All @@ -156,18 +156,18 @@ async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
avatar_url=profile["avatar_url"], display_name=profile["displayname"]
)

async def get_profile_displayname(self, user_localpart: str) -> Optional[str]:
async def get_profile_displayname(self, user_id: UserID) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
keyvalues={"full_user_id": user_id.to_string()},
retcol="displayname",
desc="get_profile_displayname",
)

async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
async def get_profile_avatar_url(self, user_id: UserID) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
keyvalues={"full_user_id": user_id.to_string()},
retcol="avatar_url",
desc="get_profile_avatar_url",
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 77 # remember to update the list below when updating
SCHEMA_VERSION = 78 # remember to update the list below when updating
Copy link
Contributor

@MadLittleMods MadLittleMods Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notes from #15691 are very helpful when trying to understand if this PR is the correct next step in the migration process 💡

And a breakdown of the PRs for this migration process so far:

  • #15458 (N + 1): Bump SCHEMA_VERSION to 76. Add full_user_id column, start writing to both the old and new column moving forward
  • #15537 (N + 2): Bump SCHEMA_VERSION to 77 and SCHEMA_COMPAT_VERSION to 76 because we added a NOT VALID constraint that only ensures that new inserts/updates always fill in full_user_id which isn't backwards compatible because older Synapse versions don't write to the new full_user_id column. Also add a background update to backfill full_user_id for all rows
  • #15649 (N + 3): Bump SCHEMA_VERSION to 78, add a foreground update to finish off the backfill and turn the NOT VALID constraint into a valid one -> full_user_id NOT NULL, and start reading from full_user_id
  • Future PRs needed for N + 4 and N + 5 where we stop writing to user_id and drop the column

Copy link
Contributor

@MadLittleMods MadLittleMods Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔️ Looks good based on the notes linked above but I've never done this kind of thing before and I'm definitely not an authoritative source of knowledge on this kind of thing.

"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2023 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine


def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
hostname = config.server.server_name

if isinstance(database_engine, PostgresEngine):
# check if the constraint can be validated
check_sql = """
SELECT user_id from profiles WHERE full_user_id IS NULL
"""
cur.execute(check_sql)
res = cur.fetchall()

if res:
# there are rows the background job missed, finish them here before we validate the constraint
process_rows_sql = """
UPDATE profiles
SET full_user_id = '@' || user_id || ?
WHERE user_id IN (
SELECT user_id FROM profiles WHERE full_user_id IS NULL
)
"""
cur.execute(process_rows_sql, (f":{hostname}",))

# Now we can validate
validate_sql = """
ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null
"""
cur.execute(validate_sql)

else:
# in SQLite we need to rewrite the table to add the constraint
cur.execute("DROP TABLE IF EXISTS temp_profiles")
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

create_sql = """
CREATE TABLE temp_profiles (
full_user_id text NOT NULL,
user_id text,
displayname text,
avatar_url text,
UNIQUE (full_user_id),
UNIQUE (user_id)
)
"""
cur.execute(create_sql)

copy_sql = """
INSERT INTO temp_profiles (
user_id,
displayname,
avatar_url,
full_user_id)
SELECT user_id, displayname, avatar_url, '@' || user_id || ':' || ? FROM profiles
"""
cur.execute(copy_sql, (f"{hostname}",))

drop_sql = """
DROP TABLE profiles
"""
cur.execute(drop_sql)

rename_sql = """
ALTER TABLE temp_profiles RENAME to profiles
"""
cur.execute(rename_sql)
Loading