Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Enable unused-awaitable error for mypy #14519

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14519.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Have mypy report `Awaitable`s that are not `await`ed or otherwise consumed.
2 changes: 1 addition & 1 deletion docs/development/synapse_architecture/cancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async def do_something() -> None:
# `do_something_else` will get its own independent
# logging context. `request-1` will not count any
# metrics from `do_something_else`.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"do_something_else",
do_something_else,
to_resolve,
Expand Down
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ local_partial_types = True
no_implicit_optional = True
disallow_untyped_defs = True
strict_equality = True
enable_error_code = unused-awaitable
warn_redundant_casts = True

files =
Expand Down
2 changes: 1 addition & 1 deletion synapse/_scripts/update_synapse_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def run_background_updates() -> None:

def run() -> None:
# Apply all background updates on the database.
defer.ensureDeferred(
defer.ensureDeferred( # type: ignore[unused-awaitable]
run_as_background_process("background_updates", run_background_updates)
)
Comment on lines +62 to 64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the call to ensureDeferred actually doing anything here? run_as_background_process returns a Deferred and ensureDeferred is a no-op when its argument is a Deferred.

(I also find this script really confusing---the way it starts the reactor and gets the task being run to stop the reactor!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that we can remove the ensureDeferred call.


Expand Down
4 changes: 2 additions & 2 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def performance_stats_init() -> None:
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
)
hs.get_datastores().main.reap_monthly_active_users()
hs.get_datastores().main.reap_monthly_active_users() # type: ignore[unused-awaitable]

@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users() -> None:
Expand All @@ -212,7 +212,7 @@ async def generate_monthly_active_users() -> None:
max_mau_gauge.set(float(hs.config.server.max_mau_value))

if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
generate_monthly_active_users() # type: ignore[unused-awaitable]
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings

Expand Down
8 changes: 4 additions & 4 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def start_background_request(self, service: ApplicationService) -> None:
if service.id in self.requests_in_flight:
return

run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"as-sender-%s" % (service.id,), self._send_request, service
)

Expand Down Expand Up @@ -407,10 +407,10 @@ async def send(
if sent:
await txn.complete(self.store)
else:
run_in_background(self._on_txn_fail, service)
run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._on_txn_fail, service)
run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]

async def on_recovered(self, recoverer: "_Recoverer") -> None:
logger.info(
Expand Down Expand Up @@ -479,7 +479,7 @@ def __init__(

def recover(self) -> None:
def _retry() -> None:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"as-recoverer-%s" % (self.service.id,), self.retry
)

Expand Down
6 changes: 3 additions & 3 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async def _handle_old_staged_events(self) -> None:
)
if lock:
logger.info("Handling old staged inbound events in %s", room_id)
self._process_incoming_pdus_in_room_inner(
self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
room_id,
room_version,
lock,
Expand Down Expand Up @@ -277,7 +277,7 @@ async def on_incoming_transaction(
# any old events in the staging area.
if not self._started_handling_of_staged_events:
self._started_handling_of_staged_events = True
self._handle_old_staged_events()
self._handle_old_staged_events() # type: ignore[unused-awaitable]

# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
Expand Down Expand Up @@ -1144,7 +1144,7 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
)
if lock:
self._process_incoming_pdus_in_room_inner(
self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
pdu.room_id, room_version, lock, origin, pdu
)

Expand Down
4 changes: 2 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def add_to_queue(self, destination: str) -> None:
self.queue[destination] = None

if not self.processing:
self._handle()
self._handle() # type: ignore[unused-awaitable]

@wrap_as_background_process("_DestinationWakeupQueue.handle")
async def _handle(self) -> None:
Expand Down Expand Up @@ -342,7 +342,7 @@ def notify_new_events(self, max_token: RoomStreamToken) -> None:
return

# fire off a processing loop in the background
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"process_event_queue_for_federation", self._process_event_queue_loop
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def attempt_new_transaction(self) -> None:

logger.debug("TX [%s] Starting transaction loop", self._destination)

run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def authenticate_request(
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings.retry_last_ts:
run_in_background(self.reset_retry_timings, origin)
run_in_background(self.reset_retry_timings, origin) # type: ignore[unused-awaitable]

return origin

Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def notify_interested_services(self, max_token: RoomStreamToken) -> None:

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services(max_token)
self._notify_interested_services(max_token) # type: ignore[unused-awaitable]

@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
Expand Down Expand Up @@ -144,7 +144,7 @@ async def start_scheduler() -> None:
except Exception:
logger.error("Application Services Failure")

run_as_background_process("as_scheduler", start_scheduler)
run_as_background_process("as_scheduler", start_scheduler) # type: ignore[unused-awaitable]
self.started_scheduler = True

# Fork off pushes to these services
Expand Down Expand Up @@ -307,7 +307,7 @@ def notify_interested_services_ephemeral(

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
self._notify_interested_services_ephemeral( # type: ignore[unused-awaitable]
services, stream_key, new_token, users
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _start_user_parting(self) -> None:
pending deactivation, if it isn't already running.
"""
if not self._user_parter_running:
run_as_background_process("user_parter_loop", self._user_parter_loop)
run_as_background_process("user_parter_loop", self._user_parter_loop) # type: ignore[unused-awaitable]

async def _user_parter_loop(self) -> None:
"""Loop that parts deactivated users from rooms"""
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ async def notify_device_update(

# We may need to do some processing asynchronously for local user IDs.
if self.hs.is_mine_id(user_id):
self._handle_new_device_update_async()
self._handle_new_device_update_async() # type: ignore[unused-awaitable]

async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async def _check_for_unknown_devices(
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))

# Immediately attempt a resync in the background
run_in_background(self._user_device_resync, user_id=sender_user_id)
run_in_background(self._user_device_resync, user_id=sender_user_id) # type: ignore[unused-awaitable]

async def send_device_message(
self,
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def __init__(self, hs: "HomeServer"):
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
)

Expand Down Expand Up @@ -778,7 +778,7 @@ async def do_invite_join(
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the background task, but don't wait for it.

run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"handle_queued_pdus", self._handle_queued_pdus, room_queue
)

Expand Down Expand Up @@ -1838,7 +1838,7 @@ async def _sync_partial_state_room_wrapper() -> None:
room_id=room_id,
)

run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ async def _process_received_pdu(
resync = True

if resync:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"resync_device_due_to_pdu",
self._resync_device,
event.sender,
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(self, hs: "HomeServer"):
self._scheduled_expiry: Optional[IDelayedCall] = None

if not hs.config.worker.worker_app:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"_schedule_next_expiry", self._schedule_next_expiry
)

Expand Down Expand Up @@ -1954,7 +1954,7 @@ async def persist_and_notify_client_events(
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"bump_presence_active_time", self._bump_active_time, requester.user
)

Expand All @@ -1966,7 +1966,7 @@ async def _notify() -> None:
except Exception:
logger.exception("Error notifying about new room events")

run_in_background(_notify)
run_in_background(_notify) # type: ignore[unused-awaitable]

return persisted_events[-1]

Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async def purge_history_for_rooms_in_range(
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"_purge_history",
self._purge_history,
purge_id,
Expand Down Expand Up @@ -328,7 +328,7 @@ def start_purge_history(
logger.info("[purge] starting purge_id %s", purge_id)

self._purges_by_id[purge_id] = PurgeStatus()
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"purge_history",
self._purge_history,
purge_id,
Expand Down Expand Up @@ -777,7 +777,7 @@ def start_shutdown_and_purge_room(

self._delete_by_id[delete_id] = DeleteStatus()
self._delete_by_room.setdefault(room_id, []).append(delete_id)
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"shutdown_and_purge_room",
self._shutdown_and_purge_room,
delete_id,
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ def _user_syncing() -> Generator[None, None, None]:
yield
finally:
if affect_presence:
run_in_background(_end)
run_in_background(_end) # type: ignore[unused-awaitable]

return _user_syncing()

Expand Down Expand Up @@ -1337,7 +1337,7 @@ async def _process_presence() -> None:
finally:
self._event_processing = False

run_as_background_process("presence.notify_new_event", _process_presence)
run_as_background_process("presence.notify_new_event", _process_presence) # type: ignore[unused-awaitable]

async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def process() -> None:
finally:
self._is_processing = False

run_as_background_process("stats.notify_new_event", process)
run_as_background_process("stats.notify_new_event", process) # type: ignore[unused-awaitable]

async def _unsafe_process(self) -> None:
# If self.pos is None then means we haven't fetched it from DB
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
if self.federation and self.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"typing._push_remote", self._push_remote, member=member, typing=True
)

Expand Down Expand Up @@ -180,7 +180,7 @@ def process_replication_rows(
self._room_typing[row.room_id] = now_typing

if self.federation:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"_send_changes_in_typing_to_remotes",
self._send_changes_in_typing_to_remotes,
row.room_id,
Expand Down Expand Up @@ -327,7 +327,7 @@ def _stopped_typing(self, member: RoomMember) -> None:
def _push_update(self, member: RoomMember, typing: bool) -> None:
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"typing._push_remote", self._push_remote, member, typing
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def process() -> None:
self._is_processing = False

self._is_processing = True
run_as_background_process("user_directory.notify_new_event", process)
run_as_background_process("user_directory.notify_new_event", process) # type: ignore[unused-awaitable]

async def handle_local_profile_change(
self, user_id: str, profile: ProfileInfo
Expand Down
2 changes: 1 addition & 1 deletion synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ async def request(
)

# turn timeouts into RequestTimedOutErrors
request_deferred.addErrback(_timeout_to_request_timed_out_error)
request_deferred.addErrback(_timeout_to_request_timed_out_error) # type: ignore[unused-awaitable]

response = await make_deferred_yieldable(request_deferred)

Expand Down
4 changes: 2 additions & 2 deletions synapse/http/connectproxyclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def connect(self, protocolFactory: ClientFactory) -> "defer.Deferred[IProtocol]"
d = self._proxy_endpoint.connect(f)
# once the tcp socket connects successfully, we need to wait for the
# CONNECT to complete.
d.addCallback(lambda conn: f.on_connection)
d.addCallback(lambda conn: f.on_connection) # type: ignore[unused-awaitable]
return d


Expand Down Expand Up @@ -196,7 +196,7 @@ def __init__(
self.http_setup_client = HTTPConnectSetupClient(
self.host, self.port, self.proxy_creds
)
self.http_setup_client.on_connected.addCallback(self.proxyConnected)
self.http_setup_client.on_connected.addCallback(self.proxyConnected) # type: ignore[unused-awaitable]

def connectionMade(self) -> None:
self.http_setup_client.makeConnection(self.transport)
Expand Down
2 changes: 1 addition & 1 deletion synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ async def get_file(

try:
d = read_body_with_max_size(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.reactor)
d.addTimeout(self.default_timeout, self.reactor) # type: ignore[unused-awaitable]
length = await make_deferred_yieldable(d)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (max_size,)
Expand Down
2 changes: 1 addition & 1 deletion synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)

run_in_background(
run_in_background( # type: ignore[unused-awaitable]
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET
Expand Down
Loading