Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type hints to more handlers #8244

Merged
merged 7 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8244.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to pagination, initial sync and events handlers.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ files =
synapse/handlers/auth.py,
synapse/handlers/cas_handler.py,
synapse/handlers/directory.py,
synapse/handlers/events.py,
synapse/handlers/federation.py,
synapse/handlers/identity.py,
synapse/handlers/initial_sync.py,
synapse/handlers/message.py,
synapse/handlers/oidc_handler.py,
synapse/handlers/pagination.py,
synapse/handlers/presence.py,
synapse/handlers/room.py,
synapse/handlers/room_member.py,
Expand Down
49 changes: 26 additions & 23 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,30 @@

import logging
import random
from typing import TYPE_CHECKING, Iterable, List, Optional

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.utils import log_function
from synapse.types import UserID
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, UserID
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler

if TYPE_CHECKING:
from synapse.server import HomeServer


logger = logging.getLogger(__name__)


class EventStreamHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super(EventStreamHandler, self).__init__(hs)

# Count of active streams per user
self._streams_per_user = {}
# Grace timers per user to delay the "stopped" signal
self._stop_timer_per_user = {}

self.distributor = hs.get_distributor()
self.distributor.declare("started_user_eventstream")
self.distributor.declare("stopped_user_eventstream")
Expand All @@ -52,14 +53,14 @@ def __init__(self, hs):
@log_function
async def get_stream(
self,
auth_user_id,
pagin_config,
timeout=0,
as_client_event=True,
affect_presence=True,
room_id=None,
is_guest=False,
):
auth_user_id: str,
pagin_config: PaginationConfig,
timeout: int = 0,
as_client_event: bool = True,
affect_presence: bool = True,
room_id: Optional[str] = None,
is_guest: bool = False,
) -> JsonDict:
"""Fetches the events stream for a given user.
"""

Expand Down Expand Up @@ -98,7 +99,7 @@ async def get_stream(

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
to_add = [] # type: List[JsonDict]
for event in events:
if not isinstance(event, EventBase):
continue
Expand All @@ -110,7 +111,7 @@ async def get_stream(
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
event.room_id
)
) # type: Iterable[str]
else:
users = [event.state_key]

Expand Down Expand Up @@ -144,20 +145,22 @@ async def get_stream(


class EventHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super(EventHandler, self).__init__(hs)
self.storage = hs.get_storage()

async def get_event(self, user, room_id, event_id):
async def get_event(
self, user: UserID, room_id: Optional[str], event_id: str
) -> Optional[EventBase]:
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Retrieve a single specified event.

Args:
user (synapse.types.UserID): The user requesting the event
room_id (str|None): The expected room id. We'll return None if the
user: The user requesting the event
room_id: The expected room id. We'll return None if the
event's room does not match.
event_id (str): The event ID to obtain.
event_id: The event ID to obtain.
Returns:
dict: An event, or None if there is no event matching this ID.
An event, or None if there is no event matching this ID.
Raises:
SynapseError if there was a problem retrieving this event, or
AuthError if the user does not have the rights to inspect this
Expand Down
80 changes: 48 additions & 32 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING

from twisted.internet import defer

Expand All @@ -22,20 +23,25 @@
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler

if TYPE_CHECKING:
from synapse.server import HomeServer


logger = logging.getLogger(__name__)


class InitialSyncHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super(InitialSyncHandler, self).__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
Expand All @@ -48,27 +54,25 @@ def __init__(self, hs):

def snapshot_all_rooms(
self,
user_id=None,
pagin_config=None,
as_client_event=True,
include_archived=False,
):
user_id: str,
pagin_config: PaginationConfig,
as_client_event: bool = True,
include_archived: bool = False,
) -> JsonDict:
"""Retrieve a snapshot of all rooms the user is invited or has joined.

This snapshot may include messages for all rooms where the user is
joined, depending on the pagination config.

Args:
user_id (str): The ID of the user making the request.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config used to determine how many messages *PER ROOM* to return.
as_client_event (bool): True to get events in client-server format.
include_archived (bool): True to get rooms that the user has left
user_id: The ID of the user making the request.
pagin_config: The pagination config used to determine how many
messages *PER ROOM* to return.
as_client_event: True to get events in client-server format.
include_archived: True to get rooms that the user has left
Returns:
A list of dicts with "room_id" and "membership" keys for all rooms
the user is currently invited or joined in on. Rooms where the user
is joined on, may return a "messages" key with messages, depending
on the specified PaginationConfig.
A JsonDict with the same format as the response to `/intialSync`
API
"""
key = (
user_id,
Expand All @@ -91,11 +95,11 @@ def snapshot_all_rooms(

async def _snapshot_all_rooms(
self,
user_id=None,
pagin_config=None,
as_client_event=True,
include_archived=False,
):
user_id: str,
pagin_config: PaginationConfig,
as_client_event: bool = True,
include_archived: bool = False,
) -> JsonDict:

memberships = [Membership.INVITE, Membership.JOIN]
if include_archived:
Expand Down Expand Up @@ -134,7 +138,7 @@ async def _snapshot_all_rooms(
if limit is None:
limit = 10

async def handle_room(event):
async def handle_room(event: RoomsForUser):
d = {
"room_id": event.room_id,
"membership": event.membership,
Expand Down Expand Up @@ -251,17 +255,18 @@ async def handle_room(event):

return ret

async def room_initial_sync(self, requester, room_id, pagin_config=None):
async def room_initial_sync(
self, requester: Requester, room_id: str, pagin_config: PaginationConfig
) -> JsonDict:
"""Capture the a snapshot of a room. If user is currently a member of
the room this will be what is currently in the room. If the user left
the room this will be what was in the room when they left.

Args:
requester(Requester): The user to get a snapshot for.
room_id(str): The room to get a snapshot of.
pagin_config(synapse.streams.config.PaginationConfig):
The pagination config used to determine how many messages to
return.
requester: The user to get a snapshot for.
room_id: The room to get a snapshot of.
pagin_config: The pagination config used to determine how many
messages to return.
Raises:
AuthError if the user wasn't in the room.
Returns:
Expand Down Expand Up @@ -305,8 +310,14 @@ async def room_initial_sync(self, requester, room_id, pagin_config=None):
return result

async def _room_initial_sync_parted(
self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
):
self,
user_id: str,
room_id: str,
pagin_config: PaginationConfig,
membership: Membership,
member_event_id: str,
is_peeking: bool,
) -> JsonDict:
room_state = await self.state_store.get_state_for_events([member_event_id])

room_state = room_state[member_event_id]
Expand Down Expand Up @@ -350,8 +361,13 @@ async def _room_initial_sync_parted(
}

async def _room_initial_sync_joined(
self, user_id, room_id, pagin_config, membership, is_peeking
):
self,
user_id: str,
room_id: str,
pagin_config: PaginationConfig,
membership: Membership,
is_peeking: bool,
) -> JsonDict:
current_state = await self.state.get_current_state(room_id=room_id)

# TODO: These concurrently
Expand Down
Loading