-
Notifications
You must be signed in to change notification settings - Fork 335
[RFC] Use callback based asyncio.Protocol for communication #1216
[RFC] Use callback based asyncio.Protocol for communication #1216
Conversation
This pull request introduces 1 alert when merging 4169a9b into 33b2dbd - view on LGTM.com new alerts:
|
Codecov Report
@@ Coverage Diff @@
## master #1216 +/- ##
=======================================
Coverage 89.21% 89.21%
=======================================
Files 21 21
Lines 6872 6872
Branches 794 794
=======================================
Hits 6131 6131
Misses 578 578
Partials 163 163
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
@m-novikov do you have time to resolve the mypy errors? |
Ah, I though to get some feedback about idea before proceeding :) it requires more work to pass the tests, if you think idea is worthwhile I can continue |
I'm not that familiar with Protocol and Transports, though the code looks like event driven languages? I've cc-ed @bmerry to give a quick look at it, but, aside from my personal concern about instability and inconsistencies (I could be wrong since again I'm unfamiliar with this) in response data ordering, I'd say it's worth a shot! 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the concept. Have you tried it on pub/sub yet? I'm not sure how well the change to have send_command
return a response future will work with the pub/sub code, which
(a) doesn't immediately retrieve responses to subscribe commands, instead leaving them for the message handler to pick up
(b) needs to be able to handle unsolicited messages.
Possibly the Protocol could just put responses into an asyncio.Queue, and read_response
could then pull a response from the queue. If the queue gets too full (which might just be 2 responses), use pause_reading
to stop more data being added until it shrinks again. But I don't know if that will preserve the ordering one needs to make pipelining work, and it might also reduce performance again.
Do you have any thoughts on implementing the pause_writing
and resume_writing
callbacks? I can't think of anything useful they could do, since there is no way to apply back-pressure to a client that is sending lots of commands without awaiting responses.
I'm assuming the commented-out code is things you need to revisit before finalising things.
|
||
self._responder_task = None | ||
self._exc = None | ||
self._conn_waiter = asyncio.get_event_loop().create_future() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend using an asyncio.Event
for this. With a raw future, calling wait_connected
and then cancelling it will cancel the future. Although it might not be needed at all - see my comments where create_connection
is called.
self._resp_queue.append(fut) | ||
|
||
elif self._state == _State.not_connected: | ||
fut.set_exception(Exception("Not connected")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a more appropriate subclass e.g. ConnectionError?
|
||
def data_received(self, data): | ||
if self._state != _State.connected: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be an exception? Or is this something that happens if the client disconnects and we receive data that was still in the socket?
self._set_exception(exc) | ||
|
||
elif self._state == _State.connected: | ||
exc = Exception("disconnected") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably be a more specific subclass of Exception.
self._transport = transport | ||
sock = transport.get_extra_info("socket") | ||
if sock is not None: | ||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code also tries to set KEEPALIVE.
# causing an error. Do not leave the connection open. | ||
writer.close() | ||
raise | ||
await self._protocol.wait_connected() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? Looking at the documentation for create_connection it looks like it will call connection_made
on the protocol before returning.
# It's possible that connection was dropped and connection_callback was not | ||
# called yet, to stop spamming errors, avoid writing to broken pipe | ||
# Both _UnixWritePipeTransport and _SelectorSocketTransport that we | ||
# expect to see here have this attribute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about _ProactorSocketTransport? I don't know Windows so don't really know anything about it.
By the way I am going on an extended holiday soon, so might not be able to give further feedback. |
superseded by #1287 |
What do these changes do?
Attempt to refactor connection code to avoid unnecessary polling and waiting.
When testing this branch use a single connection client, connection pool has some locking around it that need to be ironed out.
But performance wise pool doesn't actually help, for simplistic client
Also this PR uses only
HiredisParser
becausefeed
parser API is needed to conveniently use with protocol callbacks.In case this approach is feasible
PythonParser
would also need to be refactored(rewritten?) to supportfeed
.Are there changes in behavior for the user?
Related issue number
#1208
Checklist
CONTRIBUTORS.txt
<Name> <Surname>
.CHANGES/
folder<issue_id>.<type>
(e.g.588.bugfix
)issue_id
change it to the pr id after creating the PR.feature
: Signifying a new feature..bugfix
: Signifying a bug fix..doc
: Signifying a documentation improvement..removal
: Signifying a deprecation or removal of public API..misc
: A ticket has been closed, but it is not of interest to users.Fix issue with non-ascii contents in doctest text files.