Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert the well known resolver to async (#8214)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Sep 1, 2020
1 parent da77520 commit 5bf8e5f
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 34 deletions.
1 change: 1 addition & 0 deletions changelog.d/8214.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ files =
synapse/handlers/saml_handler.py,
synapse/handlers/sync.py,
synapse/handlers/ui_auth,
synapse/http/federation/well_known_resolver.py,
synapse/http/server.py,
synapse/http/site.py,
synapse/logging/,
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ def request(self, method, uri, headers=None, bodyProducer=None):
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
well_known_result = yield self._well_known_resolver.get_well_known(
parsed_uri.hostname
well_known_result = yield defer.ensureDeferred(
self._well_known_resolver.get_well_known(parsed_uri.hostname)
)
delegated_server = well_known_result.delegated_server

Expand Down
57 changes: 31 additions & 26 deletions synapse/http/federation/well_known_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import logging
import random
import time
from typing import Callable, Dict, Optional, Tuple

import attr

from twisted.internet import defer
from twisted.web.client import RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse

from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock, json_decoder
Expand Down Expand Up @@ -99,15 +101,14 @@ def __init__(
self._well_known_agent = RedirectAgent(agent)
self.user_agent = user_agent

@defer.inlineCallbacks
def get_well_known(self, server_name):
async def get_well_known(self, server_name: bytes) -> WellKnownLookupResult:
"""Attempt to fetch and parse a .well-known file for the given server
Args:
server_name (bytes): name of the server, from the requested url
server_name: name of the server, from the requested url
Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
The result of the lookup
"""
try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
Expand All @@ -124,7 +125,9 @@ def get_well_known(self, server_name):
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._fetch_well_known(server_name)
result, cache_period = await self._fetch_well_known(
server_name
) # type: Tuple[Optional[bytes], float]

except _FetchWellKnownFailure as e:
if prev_result and e.temporary:
Expand Down Expand Up @@ -153,26 +156,25 @@ def get_well_known(self, server_name):

return WellKnownLookupResult(delegated_server=result)

@defer.inlineCallbacks
def _fetch_well_known(self, server_name):
async def _fetch_well_known(self, server_name: bytes) -> Tuple[bytes, float]:
"""Actually fetch and parse a .well-known, without checking the cache
Args:
server_name (bytes): name of the server, from the requested url
server_name: name of the server, from the requested url
Raises:
_FetchWellKnownFailure if we fail to lookup a result
Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
The lookup result and cache period.
"""

had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)

# We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response).
response, body = yield self._make_well_known_request(
response, body = await self._make_well_known_request(
server_name, retry=had_valid_well_known
)

Expand Down Expand Up @@ -215,20 +217,20 @@ def _fetch_well_known(self, server_name):

return result, cache_period

@defer.inlineCallbacks
def _make_well_known_request(self, server_name, retry):
async def _make_well_known_request(
self, server_name: bytes, retry: bool
) -> Tuple[IResponse, bytes]:
"""Make the well known request.
This will retry the request if requested and it fails (with unable
to connect or receives a 5xx error).
Args:
server_name (bytes)
retry (bool): Whether to retry the request if it fails.
server_name: name of the server, from the requested url
retry: Whether to retry the request if it fails.
Returns:
Deferred[tuple[IResponse, bytes]] Returns the response object and
body. Response may be a non-200 response.
Returns the response object and body. Response may be a non-200 response.
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
Expand All @@ -243,12 +245,12 @@ def _make_well_known_request(self, server_name, retry):

logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
response = await make_deferred_yieldable(
self._well_known_agent.request(
b"GET", uri, headers=Headers(headers)
)
)
body = yield make_deferred_yieldable(readBody(response))
body = await make_deferred_yieldable(readBody(response))

if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
Expand All @@ -265,21 +267,24 @@ def _make_well_known_request(self, server_name, retry):
logger.info("Error fetching %s: %s. Retrying", uri_str, e)

# Sleep briefly in the hopes that they come back up
yield self._clock.sleep(0.5)
await self._clock.sleep(0.5)


def _cache_period_from_headers(headers, time_now=time.time):
def _cache_period_from_headers(
headers: Headers, time_now: Callable[[], float] = time.time
) -> Optional[float]:
cache_controls = _parse_cache_control(headers)

if b"no-store" in cache_controls:
return 0

if b"max-age" in cache_controls:
try:
max_age = int(cache_controls[b"max-age"])
return max_age
except ValueError:
pass
max_age = cache_controls[b"max-age"]
if max_age:
try:
return int(max_age)
except ValueError:
pass

expires = headers.getRawHeaders(b"expires")
if expires is not None:
Expand All @@ -295,7 +300,7 @@ def _cache_period_from_headers(headers, time_now=time.time):
return None


def _parse_cache_control(headers):
def _parse_cache_control(headers: Headers) -> Dict[bytes, Optional[bytes]]:
cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","):
Expand Down
24 changes: 18 additions & 6 deletions tests/http/federation/test_matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,9 @@ def test_idna_srv_target(self):
def test_well_known_cache(self):
self.reactor.lookups["testserv"] = "1.2.3.4"

fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
Expand All @@ -995,15 +997,19 @@ def test_well_known_cache(self):
well_known_server.loseConnection()

# repeat the request: it should hit the cache
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")

# expire the cache
self.reactor.pump((1000.0,))

# now it should connect again
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
Expand All @@ -1026,7 +1032,9 @@ def test_well_known_cache_with_temp_failure(self):

self.reactor.lookups["testserv"] = "1.2.3.4"

fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
Expand All @@ -1052,7 +1060,9 @@ def test_well_known_cache_with_temp_failure(self):
# another lookup.
self.reactor.pump((900.0,))

fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

# The resolver may retry a few times, so fonx all requests that come along
attempts = 0
Expand Down Expand Up @@ -1082,7 +1092,9 @@ def test_well_known_cache_with_temp_failure(self):
self.reactor.pump((10000.0,))

# Repated the request, this time it should fail if the lookup fails.
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
fetch_d = defer.ensureDeferred(
self.well_known_resolver.get_well_known(b"testserv")
)

clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
Expand Down

0 comments on commit 5bf8e5f

Please sign in to comment.