Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-41279: Add StreamReaderBufferedProtocol #21446

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

tontinton
Copy link
Contributor

@tontinton tontinton commented Jul 11, 2020

https://bugs.python.org/issue41279

I got way better performance on await reader.read() using this branch on linux (check out the chart on the server.py script's comments).

The way I tested was writing a server / client:
server.py:

import asyncio
import contextlib
import time


async def client_connected(reader, writer):
    start = time.time()

    with contextlib.closing(writer):
        for i in range(1000):
            # the more this parameter's distance from 65536 is greater the better the performance global_buffer gives
            # On linux:
            # 65536 * 2 -> Gives about 160% better performance
            # 4096 -> Gives about 155% better performance
            # 65536 -> Gives about 125% better performance

            # On windows using 65536 gives the same performance for some reason, which is interesting
            # But any other value gives a bit better performance, for example 65536 * 2 gives about 120% better performance
            await reader.read(65536 * 2)

    print(f'{time.time() - start}')


async def main():
    server = await asyncio.start_server(client_connected, '127.0.0.1', 8888, global_buffer=True)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())

client.py:

import asyncio
import contextlib


async def flood(ip, port):
    message = b'A' * 1024 * 64  # tweak this parameter as much as you want
    reader, writer = await asyncio.open_connection(ip, port)
    with contextlib.closing(writer):
        while True:
            writer.write(message)
            await writer.drain()


if __name__ == "__main__":
    asyncio.run(flood('127.0.0.1', 8888))

@tontinton tontinton changed the title Fix issue 41279 bpo-41273: Convert StreamReaderProtocol to a BufferedProtocol Jul 11, 2020
@tontinton tontinton changed the title bpo-41273: Convert StreamReaderProtocol to a BufferedProtocol bpo-41279: Convert StreamReaderProtocol to a BufferedProtocol Jul 11, 2020
@tontinton tontinton force-pushed the fix-issue-41279 branch 2 times, most recently from d594f7a to af23da7 Compare July 11, 2020 17:38
@tontinton tontinton changed the title bpo-41279: Convert StreamReaderProtocol to a BufferedProtocol bpo-41279: Convert StreamReaderProtocol to a BufferedProtocol for increased performance Jul 11, 2020

data = self._data[:length]
if length > -1:
if not isinstance(self._protocol, protocols.BufferedProtocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have proper functional tests that hit this branch? If not we need to add them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably copy some tests from the uvloop project (as I own the project there will be no licensing issues even if it's a literal copy)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of doing parameterize on all the appropriate test in test_proactor_events.py.

That way we can use the already existing tests on both Protocol and BufferedProtocol

Copy link
Contributor Author

@tontinton tontinton Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I did this using the parameterize function, it did made me write a bit of ugly code I want you to look at.

Check out what I did in the test adding commit.

By the way shouldn't I add the same tests for UnixReadPipeTransportTests?

Copy link
Contributor Author

@tontinton tontinton Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I added the tests to UnixReadPipeTransportTests as well using the same patches

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I did this using the parameterize function, it did made me write a bit of ugly code I want you to look at.

Yeah, let's try to simplify it. See my other comment.

By the way shouldn't I add the same tests for UnixReadPipeTransportTests?

I guess since we're updating _UnixReadPipeTransport we should.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I simplified it a lot

Comment on lines 254 to 255
except (SystemExit, KeyboardInterrupt):
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except (SystemExit, KeyboardInterrupt):
raise

the two llines can be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

self._fatal_error(exc,
'Fatal error: protocol.buffer_updated() '
'call failed.')
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return

just a small change ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@bedevere-bot
Copy link

Thanks for making the requested changes!

@1st1, @asvetlov: please review the changes made to this pull request.

@bedevere-bot bedevere-bot requested a review from 1st1 August 8, 2020 09:38
@tontinton
Copy link
Contributor Author

Hey, just remembered that I did this, bumping! :)

self.protocol.data_received.assert_called_with(msgs[1])

tr.resume_reading()
tr.resume_reading()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this double pause_reading() above and then resume_reading() here ?

and same question for below in test_pause_resume_reading.

Copy link
Contributor Author

@tontinton tontinton Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


for msg in msgs[:2]:
self.loop._run_once()
self.protocol.data_received.assert_called_with(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before the _run_once() would not it be better to assert that the msg is not in the _mock_call_args_list of the mock object ?

and same below for the others data_received.assert_called_with following a _run_once().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand which mock object you are talking about, and I don't see any other test reading _mock_call_args_list.

Can you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like:

for msg in msgs[:2]:
            self.assertFalse(unittest.mock.call(msg) in self.protocol.data_received.mock_calls)
            self.loop._run_once()
            self.protocol.data_received.assert_called_with(msg)

to ensure that each msg is effectively processed during the _run_once() call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, but protocol's buffer_updated (data_received is only in a regular stream object, not buffered) is not a mock, it's an actual function, so no mock_calls field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok :/

This class gets better performance as BufferedProtocol uses read_into
into a buffer allocated before, instead of allocating a new buffer
each time read is called.
…pythonGH-21446)

The transport did not know how to use the proper api exported by
BufferedProtocol.

Added a new callback function that calls getbuffer() and
buffer_updated() instead of data_received() when the protocol given to
it is of type BufferedProtocol.

This is exactly the same way _SelectorSocketTransport handles a
BufferedProtocol.
…port (pythonGH-21446)

In the __init__ function if the protocol is of instance BufferedProtocol
instead of creating a buffer object, we call get_buffer on the protocol
to get its buffer.

In addition _loop_reading now calls _data_received as soon as there is
actual data instead of calling only after adding a recv_into event.

The reason for this change is because read_into could call it's callback
immediatly meaning overriding the data on the buffer before we actually
call _data_received on it, which fixes the potential issue of missed data.
@tontinton
Copy link
Contributor Author

I'll fix the ssl tests sometime this weekend

@arhadthedev
Copy link
Member

@tontinton You can turn your PR into a draft and not worry about timeframes.

…pythonGH-21446)

When calling set_protocol to change the protocol you can now change the
type of the protocol from BufferedProtocol to Protocol or vice versa.

start_tls needed this feature as it could read into a buffered protocol
at first and then change the protocol to SSLProto which is a regular
protocol.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.