Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move additional tasks to the background worker, part 4 (#8513)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Oct 13, 2020
1 parent b2486f6 commit 629a951
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 221 deletions.
1 change: 1 addition & 0 deletions changelog.d/8513.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
3 changes: 2 additions & 1 deletion synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def send_emails():
"send_renewals", self._send_renewal_emails
)

self.clock.looping_call(send_emails, 30 * 60 * 1000)
if hs.config.run_background_tasks:
self.clock.looping_call(send_emails, 30 * 60 * 1000)

async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, hs: "HomeServer"):

# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
hs.get_reactor().callWhenRunning(self._start_user_parting)

self._account_validity_enabled = hs.config.account_validity.enabled
Expand Down
18 changes: 9 additions & 9 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,23 @@ def __init__(self, hs: "HomeServer"):
self.config.block_events_without_consent_error
)

# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)

# Rooms which should be excluded from dummy insertion. (For instance,
# those without local users who can send events into the room).
#
# map from room id to time-of-last-attempt.
#
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int]

# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
# The number of forward extremeities before a dummy event is sent.
self._dummy_events_threshold = hs.config.dummy_events_threshold

if (
not self.config.worker_app
self.config.run_background_tasks
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
Expand All @@ -431,8 +433,6 @@ def __init__(self, hs: "HomeServer"):

self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages

self._dummy_events_threshold = hs.config.dummy_events_threshold

async def create_event(
self,
requester: Requester,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"):
self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max

if hs.config.retention_enabled:
if hs.config.run_background_tasks and hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
Expand Down
29 changes: 11 additions & 18 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@
MAX_AVATAR_URL_LEN = 1000


class BaseProfileHandler(BaseHandler):
class ProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.
BaseProfileHandler can be instantiated directly on workers and will
delegate to master when necessary. The master process should use the
subclass MasterProfileHandler
ProfileHandler can be instantiated directly on workers and will
delegate to master when necessary.
"""

PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super().__init__(hs)

Expand All @@ -53,6 +55,11 @@ def __init__(self, hs):

self.user_directory_handler = hs.get_user_directory_handler()

if hs.config.run_background_tasks:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

async def get_profile(self, user_id):
target_user = UserID.from_string(user_id)

Expand Down Expand Up @@ -363,20 +370,6 @@ async def check_profile_query_allowed(self, target_user, requester=None):
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
raise


class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super().__init__(hs)

assert hs.config.worker_app is None

self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache
Expand Down
12 changes: 7 additions & 5 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.password_policy import PasswordPolicyHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.register import RegistrationHandler
Expand Down Expand Up @@ -191,7 +191,12 @@ class HomeServer(metaclass=abc.ABCMeta):
"""

REQUIRED_ON_BACKGROUND_TASK_STARTUP = [
"account_validity",
"auth",
"deactivate_account",
"message",
"pagination",
"profile",
"stats",
]

Expand Down Expand Up @@ -462,10 +467,7 @@ def get_initial_sync_handler(self) -> InitialSyncHandler:

@cache_in_self
def get_profile_handler(self):
if self.config.worker_app:
return BaseProfileHandler(self)
else:
return MasterProfileHandler(self)
return ProfileHandler(self)

@cache_in_self
def get_event_creation_handler(self) -> EventCreationHandler:
Expand Down
82 changes: 41 additions & 41 deletions synapse/storage/databases/main/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,6 @@ async def set_profile_avatar_url(
desc="set_profile_avatar_url",
)


class ProfileStore(ProfileWorkerStore):
async def add_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> None:
"""Ensure we are caching the remote user's profiles.
This should only be called when `is_subscribed_remote_profile_for_user`
would return true for the user.
"""
await self.db_pool.simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="add_remote_profile_cache",
)

async def update_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> int:
Expand All @@ -138,6 +117,31 @@ async def maybe_delete_remote_profile_cache(self, user_id):
desc="delete_remote_profile_cache",
)

async def is_subscribed_remote_profile_for_user(self, user_id):
"""Check whether we are interested in a remote user's profile.
"""
res = await self.db_pool.simple_select_one_onecol(
table="group_users",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True

res = await self.db_pool.simple_select_one_onecol(
table="group_invites",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True

async def get_remote_profile_cache_entries_that_expire(
self, last_checked: int
) -> Dict[str, str]:
Expand All @@ -160,27 +164,23 @@ def _get_remote_profile_cache_entries_that_expire_txn(txn):
_get_remote_profile_cache_entries_that_expire_txn,
)

async def is_subscribed_remote_profile_for_user(self, user_id):
"""Check whether we are interested in a remote user's profile.
"""
res = await self.db_pool.simple_select_one_onecol(
table="group_users",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True
class ProfileStore(ProfileWorkerStore):
async def add_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> None:
"""Ensure we are caching the remote user's profiles.
res = await self.db_pool.simple_select_one_onecol(
table="group_invites",
This should only be called when `is_subscribed_remote_profile_for_user`
would return true for the user.
"""
await self.db_pool.simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="add_remote_profile_cache",
)

if res:
return True
52 changes: 26 additions & 26 deletions synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,32 @@ def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
values={"expiration_ts_ms": expiration_ts, "email_sent": False},
)

async def get_user_pending_deactivation(self) -> Optional[str]:
"""
Gets one user from the table of users waiting to be parted from all the rooms
they're in.
"""
return await self.db_pool.simple_select_one_onecol(
"users_pending_deactivation",
keyvalues={},
retcol="user_id",
allow_none=True,
desc="get_users_pending_deactivation",
)

async def del_user_pending_deactivation(self, user_id: str) -> None:
"""
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
# XXX: This should be simple_delete_one but we failed to put a unique index on
# the table, so somehow duplicate entries have ended up in it.
await self.db_pool.simple_delete(
"users_pending_deactivation",
keyvalues={"user_id": user_id},
desc="del_user_pending_deactivation",
)


class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down Expand Up @@ -1371,32 +1397,6 @@ async def add_user_pending_deactivation(self, user_id: str) -> None:
desc="add_user_pending_deactivation",
)

async def del_user_pending_deactivation(self, user_id: str) -> None:
"""
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
# XXX: This should be simple_delete_one but we failed to put a unique index on
# the table, so somehow duplicate entries have ended up in it.
await self.db_pool.simple_delete(
"users_pending_deactivation",
keyvalues={"user_id": user_id},
desc="del_user_pending_deactivation",
)

async def get_user_pending_deactivation(self) -> Optional[str]:
"""
Gets one user from the table of users waiting to be parted from all the rooms
they're in.
"""
return await self.db_pool.simple_select_one_onecol(
"users_pending_deactivation",
keyvalues={},
retcol="user_id",
allow_none=True,
desc="get_users_pending_deactivation",
)

async def validate_threepid_session(
self, session_id: str, client_secret: str, token: str, current_ts: int
) -> Optional[str]:
Expand Down
Loading

0 comments on commit 629a951

Please sign in to comment.