Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-41273: asyncio's proactor read transport's better performance by using recv_into instead of recv #21442

Merged
merged 2 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
"""Transport for read pipes."""

def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
self._pending_data = None
extra=None, server=None, buffer_size=65536):
self._pending_data_length = -1
self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)

self._data = bytearray(buffer_size)
self._loop.call_soon(self._loop_reading)
self._paused = False

Expand Down Expand Up @@ -217,12 +218,12 @@ def resume_reading(self):
if self._read_fut is None:
self._loop.call_soon(self._loop_reading, None)

data = self._pending_data
self._pending_data = None
if data is not None:
length = self._pending_data_length
self._pending_data_length = -1
if length > -1:
# Call the protocol methode after calling _loop_reading(),
# since the protocol can decide to pause reading again.
self._loop.call_soon(self._data_received, data)
self._loop.call_soon(self._data_received, self._data[:length], length)

if self._loop.get_debug():
logger.debug("%r resumes reading", self)
Expand All @@ -243,15 +244,15 @@ def _eof_received(self):
if not keep_open:
self.close()

def _data_received(self, data):
def _data_received(self, data, length):
if self._paused:
# Don't call any protocol method while reading is paused.
# The protocol will be called on resume_reading().
assert self._pending_data is None
self._pending_data = data
assert self._pending_data_length == -1
self._pending_data_length = length
return

if not data:
if length == 0:
self._eof_received()
return

Expand All @@ -269,6 +270,7 @@ def _data_received(self, data):
self._protocol.data_received(data)

def _loop_reading(self, fut=None):
length = -1
data = None
try:
if fut is not None:
Expand All @@ -277,26 +279,26 @@ def _loop_reading(self, fut=None):
self._read_fut = None
if fut.done():
# deliver data later in "finally" clause
data = fut.result()
length = fut.result()
if length == 0:
# we got end-of-file so no need to reschedule a new read
return

data = self._data[:length]
else:
# the future will be replaced by next proactor.recv call
fut.cancel()

if self._closing:
# since close() has been called we ignore any read data
data = None
return

if data == b'':
# we got end-of-file so no need to reschedule a new read
return

# bpo-33694: buffer_updated() has currently no fast path because of
# a data loss issue caused by overlapped WSASend() cancellation.

if not self._paused:
# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
self._read_fut = self._loop._proactor.recv_into(self._sock, self._data)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand All @@ -314,8 +316,8 @@ def _loop_reading(self, fut=None):
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading)
finally:
if data is not None:
self._data_received(data)
if length > -1:
self._data_received(data, length)


class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Expand Down
70 changes: 43 additions & 27 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def setUp(self):
self.loop._proactor = self.proactor
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
self.buffer_size = 65536

def socket_transport(self, waiter=None):
transport = _ProactorSocketTransport(self.loop, self.sock,
Expand All @@ -53,41 +54,45 @@ def test_ctor(self):
test_utils.run_briefly(self.loop)
self.assertIsNone(fut.result())
self.protocol.connection_made(tr)
self.proactor.recv.assert_called_with(self.sock, 32768)
self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))

def test_loop_reading(self):
tr = self.socket_transport()
tr._loop_reading()
self.loop._proactor.recv.assert_called_with(self.sock, 32768)
self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
self.assertFalse(self.protocol.data_received.called)
self.assertFalse(self.protocol.eof_received.called)

def test_loop_reading_data(self):
buf = b'data'
res = self.loop.create_future()
res.set_result(b'data')
res.set_result(len(buf))

tr = self.socket_transport()
tr._read_fut = res
tr._data[:len(buf)] = buf
tr._loop_reading(res)
self.loop._proactor.recv.assert_called_with(self.sock, 32768)
self.protocol.data_received.assert_called_with(b'data')
called_buf = bytearray(self.buffer_size)
called_buf[:len(buf)] = buf
self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf)
self.protocol.data_received.assert_called_with(bytearray(buf))

def test_loop_reading_no_data(self):
res = self.loop.create_future()
res.set_result(b'')
res.set_result(0)

tr = self.socket_transport()
self.assertRaises(AssertionError, tr._loop_reading, res)

tr.close = mock.Mock()
tr._read_fut = res
tr._loop_reading(res)
self.assertFalse(self.loop._proactor.recv.called)
self.assertFalse(self.loop._proactor.recv_into.called)
self.assertTrue(self.protocol.eof_received.called)
self.assertTrue(tr.close.called)

def test_loop_reading_aborted(self):
err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()

tr = self.socket_transport()
tr._fatal_error = mock.Mock()
Expand All @@ -97,7 +102,7 @@ def test_loop_reading_aborted(self):
'Fatal read error on pipe transport')

def test_loop_reading_aborted_closing(self):
self.loop._proactor.recv.side_effect = ConnectionAbortedError()
self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()

tr = self.socket_transport()
tr._closing = True
Expand All @@ -106,15 +111,15 @@ def test_loop_reading_aborted_closing(self):
self.assertFalse(tr._fatal_error.called)

def test_loop_reading_aborted_is_fatal(self):
self.loop._proactor.recv.side_effect = ConnectionAbortedError()
self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
tr = self.socket_transport()
tr._closing = False
tr._fatal_error = mock.Mock()
tr._loop_reading()
self.assertTrue(tr._fatal_error.called)

def test_loop_reading_conn_reset_lost(self):
err = self.loop._proactor.recv.side_effect = ConnectionResetError()
err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()

tr = self.socket_transport()
tr._closing = False
Expand All @@ -125,7 +130,7 @@ def test_loop_reading_conn_reset_lost(self):
tr._force_close.assert_called_with(err)

def test_loop_reading_exception(self):
err = self.loop._proactor.recv.side_effect = (OSError())
err = self.loop._proactor.recv_into.side_effect = (OSError())

tr = self.socket_transport()
tr._fatal_error = mock.Mock()
Expand Down Expand Up @@ -351,44 +356,55 @@ def test_write_eof_duplex_pipe(self):

def test_pause_resume_reading(self):
tr = self.socket_transport()
futures = []
for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
index = 0
msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
reversed_msgs = list(reversed(msgs))

def recv_into(sock, data):
f = self.loop.create_future()
f.set_result(msg)
futures.append(f)
msg = reversed_msgs.pop()

result = f.result
def monkey():
data[:len(msg)] = msg
return result()
f.result = monkey

f.set_result(len(msg))
return f

self.loop._proactor.recv.side_effect = futures
self.loop._proactor.recv_into.side_effect = recv_into
self.loop._run_once()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data1')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

for msg in msgs[:2]:
self.loop._run_once()
self.protocol.data_received.assert_called_with(bytearray(msg))

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(tr.is_reading())
for i in range(10):
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')
self.protocol.data_received.assert_called_with(bytearray(msgs[1]))

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data3')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data4')

for msg in msgs[2:4]:
self.loop._run_once()
self.protocol.data_received.assert_called_with(bytearray(msg))

tr.pause_reading()
tr.resume_reading()
self.loop.call_exception_handler = mock.Mock()
self.loop._run_once()
self.loop.call_exception_handler.assert_not_called()
self.protocol.data_received.assert_called_with(b'data5')
self.protocol.data_received.assert_called_with(bytearray(msgs[4]))
tr.close()

self.assertFalse(tr.is_reading())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Speed up any transport using ``_ProactorReadPipeTransport`` by calling
``recv_into`` instead of ``recv``, thus not creating a new buffer for each
``recv`` call in the transport's read loop.