From 158cdbe3bbdbb376b08abe3c8fd06673c4e2084d Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Mon, 26 Feb 2018 14:38:44 +0100 Subject: [PATCH 01/17] Add tracking signals for getting request/response bodies. --- aiohttp/client.py | 2 +- aiohttp/client_reqrep.py | 23 ++++++++++++++-- aiohttp/http_writer.py | 11 +++++++- aiohttp/tracing.py | 53 +++++++++++++++++++++++++++++++----- tests/test_client_session.py | 29 ++++++++++++++++---- tests/test_tracing.py | 16 ++++++++++- 6 files changed, 117 insertions(+), 17 deletions(-) diff --git a/aiohttp/client.py b/aiohttp/client.py index baa7f0a056..841a4fc7c9 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -309,7 +309,7 @@ async def _request(self, method, url, *, response_class=self._response_class, proxy=proxy, proxy_auth=proxy_auth, timer=timer, session=self, auto_decompress=self._auto_decompress, - ssl=ssl, proxy_headers=proxy_headers) + ssl=ssl, proxy_headers=proxy_headers, traces=traces) # connection timeout try: diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index e9e45d46e1..529f9f0146 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -22,6 +22,7 @@ from .helpers import PY_36, HeadersMixin, TimerNoop, noop, reify, set_result from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter from .log import client_logger +from .signals import Signal from .streams import StreamReader @@ -168,7 +169,8 @@ def __init__(self, method, url, *, proxy=None, proxy_auth=None, timer=None, session=None, auto_decompress=True, ssl=None, - proxy_headers=None): + proxy_headers=None, + traces=[]): if loop is None: loop = asyncio.get_event_loop() @@ -209,6 +211,7 @@ def __init__(self, method, url, *, if data or self.method not in self.GET_METHODS: self.update_transfer_encoding() self.update_expect_continue(expect100) + self.traces = traces def is_ssl(self): return self.url.scheme in ('https', 'wss') @@ -475,7 +478,13 @@ async def send(self, conn): if self.url.raw_query_string: path += '?' + self.url.raw_query_string - writer = StreamWriter(conn.protocol, conn.transport, self.loop) + async def on_chunk_sent(chunk): + for trace in self.traces: + await trace.send_request_chunk_sent(chunk) + + writer = StreamWriter( + conn.protocol, conn.transport, self.loop) + writer.on_chunk_sent.append(on_chunk_sent) if self.compress: writer.enable_compression(self.compress) @@ -509,11 +518,16 @@ async def send(self, conn): self._writer = self.loop.create_task(self.write_bytes(writer, conn)) + async def on_chunk_received(chunk): + for trace in self.traces: + await trace.send_response_chunk_received(chunk) + self.response = self.response_class( self.method, self.original_url, writer=self._writer, continue100=self._continue, timer=self._timer, request_info=self.request_info, auto_decompress=self._auto_decompress) + self.response.on_chunk_received.append(on_chunk_received) self.response._post_init(self.loop, self._session) return self.response @@ -573,6 +587,9 @@ def __init__(self, method, url, *, self._auto_decompress = auto_decompress self._cache = {} # reqired for @reify method decorator + # avoid circular reference so that __del__ works + self.on_chunk_received = Signal(owner=None) + @property def url(self): return self._url @@ -796,6 +813,8 @@ async def read(self): if self._content is None: try: self._content = await self.content.read() + self.on_chunk_received.freeze() + await self.on_chunk_received.send(self._content) except BaseException: self.close() raise diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index f0552a3840..6b572f7026 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -5,6 +5,8 @@ import zlib from .abc import AbstractStreamWriter +from .helpers import noop +from .signals import Signal __all__ = ('StreamWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11') @@ -30,6 +32,8 @@ def __init__(self, protocol, transport, loop): self._compress = None self._drain_waiter = None + self.on_chunk_sent = Signal(self) + @property def transport(self): return self._transport @@ -55,13 +59,18 @@ def _write(self, chunk): raise asyncio.CancelledError('Cannot write to closing transport') self._transport.write(chunk) - async def write(self, chunk, *, drain=True, LIMIT=64*1024): + async def write(self, chunk, *, drain=True, LIMIT=64 * 1024): """Writes chunk of data to a stream. write_eof() indicates end of stream. writer can't be used after write_eof() method being called. write() return drain future. """ + self.on_chunk_sent.freeze() + self.loop.create_task( + self.on_chunk_sent.send(chunk) + ) + if self._compress is not None: chunk = self._compress.compress(chunk) if not chunk: diff --git a/aiohttp/tracing.py b/aiohttp/tracing.py index b813347637..7553b59de0 100644 --- a/aiohttp/tracing.py +++ b/aiohttp/tracing.py @@ -9,13 +9,14 @@ __all__ = ( - 'TraceConfig', 'TraceRequestStartParams', 'TraceRequestEndParams', - 'TraceRequestExceptionParams', 'TraceConnectionQueuedStartParams', - 'TraceConnectionQueuedEndParams', 'TraceConnectionCreateStartParams', - 'TraceConnectionCreateEndParams', 'TraceConnectionReuseconnParams', - 'TraceDnsResolveHostStartParams', 'TraceDnsResolveHostEndParams', - 'TraceDnsCacheHitParams', 'TraceDnsCacheMissParams', - 'TraceRequestRedirectParams' + 'TraceConfig', 'TraceRequestStartParams', + 'TraceRequestChunkSentParams', 'TraceResponseChunkReceivedParams', + 'TraceRequestEndParams', 'TraceRequestExceptionParams', + 'TraceConnectionQueuedStartParams', 'TraceConnectionQueuedEndParams', + 'TraceConnectionCreateStartParams', 'TraceConnectionCreateEndParams', + 'TraceConnectionReuseconnParams', 'TraceDnsResolveHostStartParams', + 'TraceDnsResolveHostEndParams', 'TraceDnsCacheHitParams', + 'TraceDnsCacheMissParams', 'TraceRequestRedirectParams' ) @@ -25,6 +26,8 @@ class TraceConfig: def __init__(self, trace_config_ctx_factory=SimpleNamespace): self._on_request_start = Signal(self) + self._on_request_chunk_sent = Signal(self) + self._on_response_chunk_received = Signal(self) self._on_request_end = Signal(self) self._on_request_exception = Signal(self) self._on_request_redirect = Signal(self) @@ -47,6 +50,8 @@ def trace_config_ctx(self, trace_request_ctx=None): def freeze(self): self._on_request_start.freeze() + self._on_request_chunk_sent.freeze() + self._on_response_chunk_received.freeze() self._on_request_end.freeze() self._on_request_exception.freeze() self._on_request_redirect.freeze() @@ -64,6 +69,14 @@ def freeze(self): def on_request_start(self): return self._on_request_start + @property + def on_request_chunk_sent(self): + return self._on_request_chunk_sent + + @property + def on_response_chunk_received(self): + return self._on_response_chunk_received + @property def on_request_end(self): return self._on_request_end @@ -121,6 +134,18 @@ class TraceRequestStartParams: headers = attr.ib(type=CIMultiDict) +@attr.s(frozen=True, slots=True) +class TraceRequestChunkSentParams: + """ Parameters sent by the `on_request_chunk_sent` signal""" + chunk = attr.ib(type=bytes) + + +@attr.s(frozen=True, slots=True) +class TraceResponseChunkReceivedParams: + """ Parameters sent by the `on_response_chunk_received` signal""" + chunk = attr.ib(type=bytes) + + @attr.s(frozen=True, slots=True) class TraceRequestEndParams: """ Parameters sent by the `on_request_end` signal""" @@ -213,6 +238,20 @@ async def send_request_start(self, method, url, headers): TraceRequestStartParams(method, url, headers) ) + async def send_request_chunk_sent(self, chunk): + return await self._trace_config.on_request_chunk_sent.send( + self._session, + self._trace_config_ctx, + TraceRequestChunkSentParams(chunk) + ) + + async def send_response_chunk_received(self, chunk): + return await self._trace_config.on_response_chunk_received.send( + self._session, + self._trace_config_ctx, + TraceResponseChunkReceivedParams(chunk) + ) + async def send_request_end(self, method, url, headers, response): return await self._trace_config.on_request_end.send( self._session, diff --git a/tests/test_client_session.py b/tests/test_client_session.py index 60e5fcce33..f31e4def17 100644 --- a/tests/test_client_session.py +++ b/tests/test_client_session.py @@ -1,8 +1,10 @@ import asyncio import contextlib import gc +import json import re from http.cookies import SimpleCookie +from io import BytesIO from unittest import mock import pytest @@ -457,33 +459,47 @@ def test_client_session_implicit_loop_warn(): async def test_request_tracing(loop, aiohttp_client): async def handler(request): - return web.Response() + return web.json_response({'ok': True}) app = web.Application() - app.router.add_get('/', handler) + app.router.add_post('/', handler) trace_config_ctx = mock.Mock() trace_request_ctx = {} + body = 'This is request body' + gathered_req_body = BytesIO() + gathered_res_body = BytesIO() on_request_start = mock.Mock(side_effect=asyncio.coroutine(mock.Mock())) on_request_redirect = mock.Mock(side_effect=asyncio.coroutine(mock.Mock())) on_request_end = mock.Mock(side_effect=asyncio.coroutine(mock.Mock())) + async def on_request_chunk_sent(session, context, params): + gathered_req_body.write(params.chunk) + + async def on_response_chunk_received(session, context, params): + gathered_res_body.write(params.chunk) + trace_config = aiohttp.TraceConfig( trace_config_ctx_factory=mock.Mock(return_value=trace_config_ctx) ) trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) + trace_config.on_request_chunk_sent.append(on_request_chunk_sent) + trace_config.on_response_chunk_received.append(on_response_chunk_received) trace_config.on_request_redirect.append(on_request_redirect) session = await aiohttp_client(app, trace_configs=[trace_config]) - async with session.get('/', trace_request_ctx=trace_request_ctx) as resp: + async with session.post( + '/', data=body, trace_request_ctx=trace_request_ctx) as resp: + + await resp.json() on_request_start.assert_called_once_with( session.session, trace_config_ctx, aiohttp.TraceRequestStartParams( - hdrs.METH_GET, + hdrs.METH_POST, session.make_url('/'), CIMultiDict() ) @@ -493,13 +509,16 @@ async def handler(request): session.session, trace_config_ctx, aiohttp.TraceRequestEndParams( - hdrs.METH_GET, + hdrs.METH_POST, session.make_url('/'), CIMultiDict(), resp ) ) assert not on_request_redirect.called + assert gathered_req_body.getvalue() == body.encode('utf8') + assert gathered_res_body.getvalue() == json.dumps( + {'ok': True}).encode('utf8') async def test_request_tracing_exception(loop): diff --git a/tests/test_tracing.py b/tests/test_tracing.py index bc837bf3af..cdb22b93e9 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -13,10 +13,12 @@ TraceDnsCacheHitParams, TraceDnsCacheMissParams, TraceDnsResolveHostEndParams, TraceDnsResolveHostStartParams, + TraceRequestChunkSentParams, TraceRequestEndParams, TraceRequestExceptionParams, TraceRequestRedirectParams, - TraceRequestStartParams) + TraceRequestStartParams, + TraceResponseChunkReceivedParams) class TestTraceConfig: @@ -41,6 +43,8 @@ def test_freeze(self): trace_config.freeze() assert trace_config.on_request_start.frozen + assert trace_config.on_request_chunk_sent.frozen + assert trace_config.on_response_chunk_received.frozen assert trace_config.on_request_end.frozen assert trace_config.on_request_exception.frozen assert trace_config.on_request_redirect.frozen @@ -63,6 +67,16 @@ class TestTrace: (Mock(), Mock(), Mock()), TraceRequestStartParams ), + ( + 'request_chunk_sent', + (Mock(), ), + TraceRequestChunkSentParams + ), + ( + 'response_chunk_received', + (Mock(), ), + TraceResponseChunkReceivedParams + ), ( 'request_end', (Mock(), Mock(), Mock(), Mock()), From 329f89aaf62af5db6dbfdf9327419d64a5176817 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 12:54:35 +0100 Subject: [PATCH 02/17] Revert automatic pep8 fix. Mark pep8 rules E225 and E226 as ignored, to prevent automatic changes in code formating. --- aiohttp/http_writer.py | 2 +- setup.cfg | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 6b572f7026..46a93d1990 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -59,7 +59,7 @@ def _write(self, chunk): raise asyncio.CancelledError('Cannot write to closing transport') self._transport.write(chunk) - async def write(self, chunk, *, drain=True, LIMIT=64 * 1024): + async def write(self, chunk, *, drain=True, LIMIT=64*1024): """Writes chunk of data to a stream. write_eof() indicates end of stream. diff --git a/setup.cfg b/setup.cfg index d0a6788fdf..59ff0e9e3f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,6 @@ [pep8] max-line-length=79 +ignore=E225,E226 [easy_install] zip_ok = false From 24e1db985da8ca95fe7910d0d87b9b33877812e3 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 14:19:18 +0100 Subject: [PATCH 03/17] Remove internal usage of Signal in favor of simple callbacks. --- aiohttp/client_reqrep.py | 27 +++++++++++---------------- aiohttp/http_writer.py | 18 ++++++++++-------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 529f9f0146..2065ea4f75 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -22,7 +22,6 @@ from .helpers import PY_36, HeadersMixin, TimerNoop, noop, reify, set_result from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter from .log import client_logger -from .signals import Signal from .streams import StreamReader @@ -483,8 +482,9 @@ async def on_chunk_sent(chunk): await trace.send_request_chunk_sent(chunk) writer = StreamWriter( - conn.protocol, conn.transport, self.loop) - writer.on_chunk_sent.append(on_chunk_sent) + conn.protocol, conn.transport, self.loop, + on_chunk_sent=on_chunk_sent + ) if self.compress: writer.enable_compression(self.compress) @@ -518,17 +518,13 @@ async def on_chunk_sent(chunk): self._writer = self.loop.create_task(self.write_bytes(writer, conn)) - async def on_chunk_received(chunk): - for trace in self.traces: - await trace.send_response_chunk_received(chunk) - self.response = self.response_class( self.method, self.original_url, writer=self._writer, continue100=self._continue, timer=self._timer, request_info=self.request_info, - auto_decompress=self._auto_decompress) - self.response.on_chunk_received.append(on_chunk_received) - + auto_decompress=self._auto_decompress, + traces=self.traces, + ) self.response._post_init(self.loop, self._session) return self.response @@ -569,7 +565,8 @@ class ClientResponse(HeadersMixin): def __init__(self, method, url, *, writer=None, continue100=None, timer=None, - request_info=None, auto_decompress=True): + request_info=None, auto_decompress=True, + traces=[]): assert isinstance(url, URL) self.method = method @@ -586,9 +583,7 @@ def __init__(self, method, url, *, self._timer = timer if timer is not None else TimerNoop() self._auto_decompress = auto_decompress self._cache = {} # reqired for @reify method decorator - - # avoid circular reference so that __del__ works - self.on_chunk_received = Signal(owner=None) + self._traces = traces @property def url(self): @@ -813,8 +808,8 @@ async def read(self): if self._content is None: try: self._content = await self.content.read() - self.on_chunk_received.freeze() - await self.on_chunk_received.send(self._content) + for trace in self._traces: + await trace.send_response_chunk_received(self._content) except BaseException: self.close() raise diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 46a93d1990..abf33a4228 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -2,11 +2,10 @@ import asyncio import collections +import inspect import zlib from .abc import AbstractStreamWriter -from .helpers import noop -from .signals import Signal __all__ = ('StreamWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11') @@ -18,7 +17,12 @@ class StreamWriter(AbstractStreamWriter): - def __init__(self, protocol, transport, loop): + def __init__(self, protocol, transport, loop, on_chunk_sent=None): + assert ( + on_chunk_sent is None or + inspect.iscoroutinefunction(on_chunk_sent) + ) + self._protocol = protocol self._transport = transport @@ -32,7 +36,7 @@ def __init__(self, protocol, transport, loop): self._compress = None self._drain_waiter = None - self.on_chunk_sent = Signal(self) + self._on_chunk_sent = on_chunk_sent @property def transport(self): @@ -66,10 +70,8 @@ async def write(self, chunk, *, drain=True, LIMIT=64*1024): writer can't be used after write_eof() method being called. write() return drain future. """ - self.on_chunk_sent.freeze() - self.loop.create_task( - self.on_chunk_sent.send(chunk) - ) + if self._on_chunk_sent: + await self._on_chunk_sent(chunk) if self._compress is not None: chunk = self._compress.compress(chunk) From 1be8ecb9d7815345025e6068382f146adab8245a Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 14:29:25 +0100 Subject: [PATCH 04/17] Document new signals --- docs/tracing_reference.rst | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docs/tracing_reference.rst b/docs/tracing_reference.rst index 5cfaf09c52..cba7573f40 100644 --- a/docs/tracing_reference.rst +++ b/docs/tracing_reference.rst @@ -147,6 +147,20 @@ TraceConfig ``params`` is :class:`aiohttp.TraceRequestStartParams` instance. + .. attribute:: on_request_chunk_sent + + Property that gives access to the signals that will be executed + when a chunk of request body is sent. + + ``params`` is :class:`aiohttp.TraceRequestChunkSentParams` instance. + + .. attribute:: on_response_chunk_received + + Property that gives access to the signals that will be executed + when a chunk of response body is received. + + ``params`` is :class:`aiohttp.TraceResponseChunkReceivedParams` instance. + .. attribute:: on_request_redirect Property that gives access to the signals that will be executed when a @@ -259,6 +273,31 @@ TraceRequestStartParams Headers that will be used for the request, can be mutated. + +TraceRequestChunkSentParams +--------------------------- + +.. class:: TraceRequestChunkSentParams + + See :attr:`TraceConfig.on_request_chunk_sent` for details. + + .. attribute:: chunk + + Bytes of chunk sent + + +TraceResponseChunkSentParams +--------------------------- + +.. class:: TraceResponseChunkSentParams + + See :attr:`TraceConfig.on_response_chunk_received` for details. + + .. attribute:: chunk + + Bytes of chunk received + + TraceRequestEndParams --------------------- From f19e7c0b1db68c781c9dd9b703371eb2251f246f Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 14:53:08 +0100 Subject: [PATCH 05/17] Move callback to a private method. --- aiohttp/client_reqrep.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 2065ea4f75..3f9eb18972 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -477,13 +477,9 @@ async def send(self, conn): if self.url.raw_query_string: path += '?' + self.url.raw_query_string - async def on_chunk_sent(chunk): - for trace in self.traces: - await trace.send_request_chunk_sent(chunk) - writer = StreamWriter( conn.protocol, conn.transport, self.loop, - on_chunk_sent=on_chunk_sent + on_chunk_sent=self._on_chunk_request_sent ) if self.compress: @@ -541,6 +537,10 @@ def terminate(self): self._writer.cancel() self._writer = None + async def _on_chunk_request_sent(self, chunk): + for trace in self.traces: + await trace.send_request_chunk_sent(chunk) + class ClientResponse(HeadersMixin): From 45d63323799d2d17282e9106b4e7b1aa00bd7b37 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 14:53:45 +0100 Subject: [PATCH 06/17] Make check more idiomatic --- aiohttp/http_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index abf33a4228..3d2e2faf2f 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -70,7 +70,7 @@ async def write(self, chunk, *, drain=True, LIMIT=64*1024): writer can't be used after write_eof() method being called. write() return drain future. """ - if self._on_chunk_sent: + if self._on_chunk_sent is not None: await self._on_chunk_sent(chunk) if self._compress is not None: From e1e82e5feea4f610ec21730baa260207367452f1 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 14:54:37 +0100 Subject: [PATCH 07/17] Reorder classes in __all__ --- aiohttp/tracing.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/aiohttp/tracing.py b/aiohttp/tracing.py index 7553b59de0..165e68cbf9 100644 --- a/aiohttp/tracing.py +++ b/aiohttp/tracing.py @@ -9,14 +9,14 @@ __all__ = ( - 'TraceConfig', 'TraceRequestStartParams', + 'TraceConfig', 'TraceRequestStartParams', 'TraceRequestEndParams', + 'TraceRequestExceptionParams', 'TraceConnectionQueuedStartParams', + 'TraceConnectionQueuedEndParams', 'TraceConnectionCreateStartParams', + 'TraceConnectionCreateEndParams', 'TraceConnectionReuseconnParams', + 'TraceDnsResolveHostStartParams', 'TraceDnsResolveHostEndParams', + 'TraceDnsCacheHitParams', 'TraceDnsCacheMissParams', + 'TraceRequestRedirectParams', 'TraceRequestChunkSentParams', 'TraceResponseChunkReceivedParams', - 'TraceRequestEndParams', 'TraceRequestExceptionParams', - 'TraceConnectionQueuedStartParams', 'TraceConnectionQueuedEndParams', - 'TraceConnectionCreateStartParams', 'TraceConnectionCreateEndParams', - 'TraceConnectionReuseconnParams', 'TraceDnsResolveHostStartParams', - 'TraceDnsResolveHostEndParams', 'TraceDnsCacheHitParams', - 'TraceDnsCacheMissParams', 'TraceRequestRedirectParams' ) From 6e3819faadcd000d174dc0bb04db53d2fda7d7e6 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 15:00:07 +0100 Subject: [PATCH 08/17] Update request lifecycle diagram to include new signals --- docs/tracing_reference.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/tracing_reference.rst b/docs/tracing_reference.rst index cba7573f40..bffaf0fb91 100644 --- a/docs/tracing_reference.rst +++ b/docs/tracing_reference.rst @@ -34,8 +34,8 @@ Overview exception[shape=flowchart.terminator, description="on_request_exception"]; acquire_connection[description="Connection acquiring"]; - got_response; - send_request; + got_response[description="on_response_chunk_received"]; + send_request[description="on_request_chunk_sent"]; start -> acquire_connection; acquire_connection -> send_request; @@ -287,7 +287,7 @@ TraceRequestChunkSentParams TraceResponseChunkSentParams ---------------------------- +---------------------------- .. class:: TraceResponseChunkSentParams From 89dcb0fa5c2a330f4e31b20fecba353b13e0b70b Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 15:06:56 +0100 Subject: [PATCH 09/17] Don't use mutable defaults for traces. Make it private in ClientRequest --- aiohttp/client_reqrep.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 3f9eb18972..f24ef5c79d 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -169,7 +169,7 @@ def __init__(self, method, url, *, timer=None, session=None, auto_decompress=True, ssl=None, proxy_headers=None, - traces=[]): + traces=None): if loop is None: loop = asyncio.get_event_loop() @@ -210,7 +210,7 @@ def __init__(self, method, url, *, if data or self.method not in self.GET_METHODS: self.update_transfer_encoding() self.update_expect_continue(expect100) - self.traces = traces + self._traces = traces or [] def is_ssl(self): return self.url.scheme in ('https', 'wss') @@ -519,7 +519,7 @@ async def send(self, conn): writer=self._writer, continue100=self._continue, timer=self._timer, request_info=self.request_info, auto_decompress=self._auto_decompress, - traces=self.traces, + traces=self._traces, ) self.response._post_init(self.loop, self._session) return self.response @@ -538,7 +538,7 @@ def terminate(self): self._writer = None async def _on_chunk_request_sent(self, chunk): - for trace in self.traces: + for trace in self._traces: await trace.send_request_chunk_sent(chunk) @@ -566,7 +566,7 @@ class ClientResponse(HeadersMixin): def __init__(self, method, url, *, writer=None, continue100=None, timer=None, request_info=None, auto_decompress=True, - traces=[]): + traces=None): assert isinstance(url, URL) self.method = method @@ -583,7 +583,7 @@ def __init__(self, method, url, *, self._timer = timer if timer is not None else TimerNoop() self._auto_decompress = auto_decompress self._cache = {} # reqired for @reify method decorator - self._traces = traces + self._traces = traces or [] @property def url(self): From 8288c2669ae22ee606ccd71159dd325be9a6b417 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Tue, 27 Feb 2018 15:31:01 +0100 Subject: [PATCH 10/17] Further updates to tracing documentation --- docs/tracing_reference.rst | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/docs/tracing_reference.rst b/docs/tracing_reference.rst index bffaf0fb91..077ded2852 100644 --- a/docs/tracing_reference.rst +++ b/docs/tracing_reference.rst @@ -34,16 +34,24 @@ Overview exception[shape=flowchart.terminator, description="on_request_exception"]; acquire_connection[description="Connection acquiring"]; - got_response[description="on_response_chunk_received"]; - send_request[description="on_request_chunk_sent"]; + headers_received; + headers_sent; + chunk_sent[description="on_request_chunk_sent"]; + chunk_received[description="on_response_chunk_received"]; start -> acquire_connection; - acquire_connection -> send_request; - send_request -> got_response; - got_response -> redirect; - got_response -> end; - redirect -> send_request; - send_request -> exception; + acquire_connection -> headers_sent; + headers_sent -> headers_received; + headers_sent -> chunk_sent; + chunk_sent -> chunk_sent; + chunk_sent -> headers_received; + headers_received -> chunk_received; + chunk_received -> chunk_received; + chunk_received -> end; + headers_received -> redirect; + headers_received -> end; + redirect -> headers_sent; + headers_sent -> exception; } @@ -149,6 +157,8 @@ TraceConfig .. attribute:: on_request_chunk_sent + .. versionadded:: 3.1 + Property that gives access to the signals that will be executed when a chunk of request body is sent. @@ -156,6 +166,7 @@ TraceConfig .. attribute:: on_response_chunk_received + Property that gives access to the signals that will be executed when a chunk of response body is received. @@ -279,6 +290,8 @@ TraceRequestChunkSentParams .. class:: TraceRequestChunkSentParams + .. versionadded:: 3.1 + See :attr:`TraceConfig.on_request_chunk_sent` for details. .. attribute:: chunk @@ -291,6 +304,8 @@ TraceResponseChunkSentParams .. class:: TraceResponseChunkSentParams + .. versionadded:: 3.1 + See :attr:`TraceConfig.on_response_chunk_received` for details. .. attribute:: chunk From 202cb8619d235dbfa8a46174b66ebd77bdeb7122 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Wed, 28 Feb 2018 12:45:09 +0100 Subject: [PATCH 11/17] Polish docs --- docs/tracing_reference.rst | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/tracing_reference.rst b/docs/tracing_reference.rst index 077ded2852..e1f1ab9f6d 100644 --- a/docs/tracing_reference.rst +++ b/docs/tracing_reference.rst @@ -51,6 +51,8 @@ Overview headers_received -> redirect; headers_received -> end; redirect -> headers_sent; + chunk_received -> exception; + chunk_sent -> exception; headers_sent -> exception; } @@ -157,13 +159,14 @@ TraceConfig .. attribute:: on_request_chunk_sent - .. versionadded:: 3.1 Property that gives access to the signals that will be executed when a chunk of request body is sent. ``params`` is :class:`aiohttp.TraceRequestChunkSentParams` instance. + .. versionadded:: 3.1 + .. attribute:: on_response_chunk_received @@ -172,6 +175,8 @@ TraceConfig ``params`` is :class:`aiohttp.TraceResponseChunkReceivedParams` instance. + .. versionadded:: 3.1 + .. attribute:: on_request_redirect Property that gives access to the signals that will be executed when a From 57e306013f7e943865040f035b9b35cfeac1de86 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Thu, 1 Mar 2018 11:49:01 +0100 Subject: [PATCH 12/17] Revert ignoring pep8 rules --- aiohttp/http_writer.py | 2 +- setup.cfg | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 3d2e2faf2f..f3625b40b1 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -63,7 +63,7 @@ def _write(self, chunk): raise asyncio.CancelledError('Cannot write to closing transport') self._transport.write(chunk) - async def write(self, chunk, *, drain=True, LIMIT=64*1024): + async def write(self, chunk, *, drain=True, LIMIT=0x10000): """Writes chunk of data to a stream. write_eof() indicates end of stream. diff --git a/setup.cfg b/setup.cfg index 59ff0e9e3f..d0a6788fdf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,5 @@ [pep8] max-line-length=79 -ignore=E225,E226 [easy_install] zip_ok = false From f944a173b5b5111967075366f9115748e6f20579 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Thu, 1 Mar 2018 11:50:50 +0100 Subject: [PATCH 13/17] Subtle optimisation - don't create list instance if not needed --- aiohttp/client_reqrep.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index f24ef5c79d..33bd25ca7c 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -210,7 +210,9 @@ def __init__(self, method, url, *, if data or self.method not in self.GET_METHODS: self.update_transfer_encoding() self.update_expect_continue(expect100) - self._traces = traces or [] + if traces is None: + traces = [] + self._traces = traces def is_ssl(self): return self.url.scheme in ('https', 'wss') @@ -583,7 +585,9 @@ def __init__(self, method, url, *, self._timer = timer if timer is not None else TimerNoop() self._auto_decompress = auto_decompress self._cache = {} # reqired for @reify method decorator - self._traces = traces or [] + if traces is None: + traces = [] + self._traces = traces @property def url(self): From 6a93b1607a171de96980fb0b4b2fd6176ba9945d Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Thu, 1 Mar 2018 11:51:44 +0100 Subject: [PATCH 14/17] Remove assert statement --- aiohttp/http_writer.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index f3625b40b1..bc7201da1a 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -2,7 +2,6 @@ import asyncio import collections -import inspect import zlib from .abc import AbstractStreamWriter @@ -18,11 +17,6 @@ class StreamWriter(AbstractStreamWriter): def __init__(self, protocol, transport, loop, on_chunk_sent=None): - assert ( - on_chunk_sent is None or - inspect.iscoroutinefunction(on_chunk_sent) - ) - self._protocol = protocol self._transport = transport From 9f8d3894ce274c02d8bb54322c17f4a30554a370 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Thu, 1 Mar 2018 11:58:06 +0100 Subject: [PATCH 15/17] Add test case ensuring StreamWriter calls callback --- tests/test_http_writer.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_http_writer.py b/tests/test_http_writer.py index 9b22750d18..47d41ef398 100644 --- a/tests/test_http_writer.py +++ b/tests/test_http_writer.py @@ -159,6 +159,18 @@ async def test_write_drain(protocol, transport, loop): assert msg.buffer_size == 0 +async def test_write_calls_callback(protocol, transport, loop): + on_chunk_sent = make_mocked_coro() + msg = http.StreamWriter( + protocol, transport, loop, + on_chunk_sent=on_chunk_sent + ) + chunk = b'1' + await msg.write(chunk) + assert on_chunk_sent.called + assert on_chunk_sent.call_args == mock.call(chunk) + + async def test_write_to_closing_transport(protocol, transport, loop): msg = http.StreamWriter(protocol, transport, loop) From 4fbc080fe9b847a80744822c727fa2d1b03f5c29 Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Thu, 1 Mar 2018 12:09:17 +0100 Subject: [PATCH 16/17] Add test checking that response.read() trigger trace callback --- tests/test_client_response.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/test_client_response.py b/tests/test_client_response.py index 1b40a3909e..d8dfe7b54c 100644 --- a/tests/test_client_response.py +++ b/tests/test_client_response.py @@ -10,6 +10,7 @@ import aiohttp from aiohttp import http from aiohttp.client_reqrep import ClientResponse, RequestInfo +from aiohttp.test_utils import make_mocked_coro @pytest.fixture @@ -613,3 +614,35 @@ def test_redirect_history_in_exception(): with pytest.raises(aiohttp.ClientResponseError) as cm: response.raise_for_status() assert [hist_response] == cm.value.history + + +async def test_response_read_triggers_callback(loop, session): + trace = mock.Mock() + trace.send_response_chunk_received = make_mocked_coro() + response_body = b'This is response' + + response = ClientResponse( + 'get', URL('http://def-cl-resp.org'), + traces=[trace] + ) + response._post_init(loop, session) + + def side_effect(*args, **kwargs): + fut = loop.create_future() + fut.set_result(response_body) + return fut + + response.headers = { + 'Content-Type': 'application/json;charset=cp1251'} + content = response.content = mock.Mock() + content.read.side_effect = side_effect + + res = await response.read() + assert res == response_body + assert response._connection is None + + assert trace.send_response_chunk_received.called + assert ( + trace.send_response_chunk_received.call_args == + mock.call(response_body) + ) From 7badf72e7abc9418652cfa92d11bf4fc6f71d96e Mon Sep 17 00:00:00 2001 From: Marek Kowalski Date: Thu, 1 Mar 2018 16:50:31 +0100 Subject: [PATCH 17/17] Add CHANGES record --- CHANGES/2767.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGES/2767.feature diff --git a/CHANGES/2767.feature b/CHANGES/2767.feature new file mode 100644 index 0000000000..99a8b4e538 --- /dev/null +++ b/CHANGES/2767.feature @@ -0,0 +1 @@ +Add tracking signals for getting request/response bodies. \ No newline at end of file