Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIO] Minimal AsyncIO Server #20598

Merged
merged 10 commits into from
Oct 22, 2019
Merged

[AIO] Minimal AsyncIO Server #20598

merged 10 commits into from
Oct 22, 2019

Conversation

lidizheng
Copy link
Contributor

Related issue: #19496

This PR:

  • Extends AsyncIO IO manager to support server-side operations;
  • Adds more logic to AsyncSocket class;
  • Implements an AsyncIO server that can serve unary-unary handlers;
  • Adds a server test with grpc.aio.Channel;
  • Support both Bazel / setup.py build.

@pfreixes @gnossen PTAL.

* Extends AsyncIO IO manager to support server-side operations;
* Adds more logic to AsyncSocket class;
* Implements an AsyncIO server that can serve unary-unary handlers;
* Adds a server test with grpc.aio.Channel;
* Support both Bazel / setup.py build.
Copy link
Collaborator

@pfreixes pfreixes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hard work Lidiz!

src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi Outdated Show resolved Hide resolved
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi Outdated Show resolved Hide resolved
src/python/grpcio/grpc/experimental/aio/_server.py Outdated Show resolved Hide resolved
aio.init_grpc_aio()
loop = asyncio.get_event_loop()
loop.create_task(_start_async_server())
loop.run_forever()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this already used in someplace? where can we see the results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not automated right now. I just have a bunch of manual scripts to generate the result.

The important part of the benchmark is the client-side, I had two version, one is multiprocessing Python client, and one is Golang client. They are not fitting in this directory, so I just checked in this one file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be Aio added as part of the current and official gRPC benchmark results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add it later. To add it into our benchmark suite:

  1. The Async API should support both unary and streaming RPC;
  2. Can work with sync API in same process (the control plane is also in gRPC).

src/python/grpcio_tests/tests_aio/unit/server_test.py Outdated Show resolved Hide resolved
src/python/grpcio_tests/tests_aio/unit/server_test.py Outdated Show resolved Hide resolved
src/python/grpcio_tests/tests_aio/unit/server_test.py Outdated Show resolved Hide resolved
Copy link
Contributor Author

@lidizheng lidizheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing! All comments resolved or discussed. PTALA.
(Bazel seems still fail on Kokoro, it passes locally.)

src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi Outdated Show resolved Hide resolved
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi Outdated Show resolved Hide resolved
src/python/grpcio/grpc/experimental/aio/_server.py Outdated Show resolved Hide resolved
aio.init_grpc_aio()
loop = asyncio.get_event_loop()
loop.create_task(_start_async_server())
loop.run_forever()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not automated right now. I just have a bunch of manual scripts to generate the result.

The important part of the benchmark is the client-side, I had two version, one is multiprocessing Python client, and one is Golang client. They are not fitting in this directory, so I just checked in this one file.

src/python/grpcio_tests/tests_aio/unit/server_test.py Outdated Show resolved Hide resolved
Copy link
Contributor

@gnossen gnossen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome stuff! I'm looking forward to using an asyncio server.

In general, it would be great if we could get more inline comments and docstrings. One of the biggest problems with the old stack is that it's pretty much impenetrable for anyone but the original authors. Optimizing for future readers should be a priority in the new stack.


cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a nit (since this has apparently been a problem for a while).

This function's signature is really odd. Right now, it's

void grpc_string_to_sockaddr(grpc_resolved_address* out, char* addr, int port)

It should probably be

void grpc_string_to_sockaddr(const char* addr, int port, grpc_resolved_address* out)
  • outparams should come last (though there are apparently quite a few functions that break that in core and C++)
  • addr is only ever read, so it should be const

cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that we have to copy here. It seems like it's only necessary to fit two interfaces together. Since we're in control of both interfaces, is it possible to eliminate this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a TODO. Changing the underlying C-Core interface can be risky. And this is in the control plane, so the impact is not that high.

@@ -81,39 +82,82 @@ cdef grpc_error* asyncio_socket_getpeername(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the GIL being used in this function? Unless I'm missing something, everything appears to be cdef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. socket._peername is a Python object as well. Would it make sense to make it a native struct so we don't have to hold the GIL in this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket._peername is a Python tuple, so I think we still need GIL here.

Python is treating all the addresses in a weird way that they are tuples of (address, port).

grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()


cdef grpc_error* asyncio_socket_getsockname(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise. Where is the GIL being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any access to Python object requires GIL; since they needs to keep refcount accurate. #i_am_inevitable

Copy link
Contributor

@gnossen gnossen Oct 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I see socket.sockname() now. Can we reduce the scope during which we hold the GIL to just that portion of the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve this problem correctly, we need to modify the signature of IO manager which is shared between gevent and AsyncIO. This work is doable, but for single thread application the impact for performance won't be that significant. I would prefer delay it until we introduced the sync-handler-executes-in-thread-pool feature.

I created a TODO issue #20689.



async def _handle_unary_unary_rpc(object method_handler, RPCState rpc_state, object loop):
# Receives request message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that these blocks are commented signals that they should be broken out into well-named functions. If performance is a concern, try cdef inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this way since it gives just enough information for reader to understand all the details. Also, await can not be added to cdef.

src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi Outdated Show resolved Hide resolved
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi Outdated Show resolved Hide resolved
Copy link
Contributor Author

@lidizheng lidizheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed review. I removed the _AioServerState, but kept the RPCState, because we need a representation to track the life cycle of an RPC, which lives across the boundary of functions.

grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()


cdef grpc_error* asyncio_socket_getsockname(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any access to Python object requires GIL; since they needs to keep refcount accurate. #i_am_inevitable

socket = native_socket.socket(family=native_socket.AF_INET6)
_asyncio_apply_socket_options(socket)
socket.bind((host, port))
except native_socket.gaierror:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified to use ipaddress (Python 3.3+) to detect if it is IPv6. The only concern is I hope the grpc_sockaddr they passes in is IP instead of other hostnames.

cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a TODO. Changing the underlying C-Core interface can be risky. And this is in the control plane, so the impact is not that high.

_asyncio_apply_socket_options(socket)
socket.bind((host, port))
except IOError as io_error:
return socket_error("bind", str(io_error))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems str is better, it gives the context.

try:
  open('a', 'r')
except Exception as e:
  print(repr(e))  # IOError(2, 'No such file or directory')
  print(str(e))  # [Errno 2] No such file or directory: 'a'

src/python/grpcio_tests/tests_aio/unit/server_test.py Outdated Show resolved Hide resolved


cdef class AioServer:
cdef _AioServerState _state
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_AioServerState removed. Since Cython cannot use C pointers in Python function signature, I added a _CallbackCompletionQueue class to wrap the C pointer to completion queue.

src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi Outdated Show resolved Hide resolved
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi Outdated Show resolved Hide resolved
# await waiter

loop.create_task(_handle_rpc(server_state, rpc_state, loop))
await asyncio.sleep(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coroutine will break the execution returning back the control to the loop here [1], no matter if gRPC already has something for being delivered - like a pending request, the future that is being awaited within the _server_call_request_call function will have always to yield the execution for at least one loop iteration which would give enough time for executing the already accepted request.,

The only reason that I would think that this could help will be an optimization for giving more priority to the current request. But TBH doing this, we will start polluting our code with await asyncio.sleep(0) when we do not have yet probes that this is really helping a lot.

[1] https://github.com/grpc/grpc/pull/20598/files#diff-b53c077f7911f53dc8ce7656e16d35cdR184

async def start(self):
loop = asyncio.get_event_loop()
loop.create_task(_server_start(self._state))
await asyncio.sleep(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the idea is helping the unit test I would move the await asyncio.sleep(0) into the unit test. If the idea is having a kind of synchronization, so having the start coroutine waiting till the server is already accepting requests then maybe we should make it a bit more explicitly by using an Asyncio synchronization primitive, which will be easily testable.

aio.init_grpc_aio()
loop = asyncio.get_event_loop()
loop.create_task(_start_async_server())
loop.run_forever()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be Aio added as part of the current and official gRPC benchmark results?

@@ -81,39 +82,82 @@ cdef grpc_error* asyncio_socket_getpeername(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. socket._peername is a Python object as well. Would it make sense to make it a native struct so we don't have to hold the GIL in this function?

@lock lock bot locked as resolved and limited conversation to collaborators Jan 24, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants