Skip to content

Commit

Permalink
Merge pull request #20598 from lidizheng/aio-server
Browse files Browse the repository at this point in the history
[AIO] Minimal AsyncIO Server
  • Loading branch information
lidizheng authored Oct 22, 2019
2 parents 3e48b20 + a588b5f commit 3c62c8a
Show file tree
Hide file tree
Showing 15 changed files with 819 additions and 14 deletions.
17 changes: 17 additions & 0 deletions src/proto/grpc/testing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,23 @@ grpc_proto_library(
],
)

proto_library(
name = "benchmark_service_descriptor",
srcs = ["benchmark_service.proto"],
deps = [":messages_proto_descriptor"],
)

py_proto_library(
name = "benchmark_service_py_pb2",
deps = [":benchmark_service_descriptor"],
)

py_grpc_library(
name = "benchmark_service_py_pb2_grpc",
srcs = [":benchmark_service_descriptor"],
deps = [":benchmark_service_py_pb2"],
)

grpc_proto_library(
name = "report_qps_scenario_service_proto",
srcs = ["report_qps_scenario_service.proto"],
Expand Down
2 changes: 2 additions & 0 deletions src/python/grpcio/grpc/_cython/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pyx_library(
"_cygrpc/aio/iomgr/socket.pyx.pxi",
"_cygrpc/aio/iomgr/timer.pxd.pxi",
"_cygrpc/aio/iomgr/timer.pyx.pxi",
"_cygrpc/aio/server.pxd.pxi",
"_cygrpc/aio/server.pyx.pxi",
"_cygrpc/arguments.pxd.pxi",
"_cygrpc/arguments.pyx.pxi",
"_cygrpc/call.pxd.pxi",
Expand Down
67 changes: 59 additions & 8 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@


from cpython cimport Py_INCREF, Py_DECREF

from libc cimport string

import socket as native_socket
try:
import ipaddress # CPython 3.3 and above
except ImportError:
pass

cdef grpc_socket_vtable asyncio_socket_vtable
cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
cdef grpc_custom_timer_vtable asyncio_timer_vtable
Expand All @@ -26,7 +31,7 @@ cdef grpc_custom_poller_vtable asyncio_pollset_vtable
cdef grpc_error* asyncio_socket_init(
grpc_custom_socket* grpc_socket,
int domain) with gil:
socket = _AsyncioSocket.create(grpc_socket)
socket = _AsyncioSocket.create(grpc_socket, None, None)
Py_INCREF(socket)
grpc_socket.impl = <void*>socket
return <grpc_error*>0
Expand Down Expand Up @@ -81,39 +86,85 @@ cdef grpc_error* asyncio_socket_getpeername(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
raise NotImplemented()
peer = (<_AsyncioSocket>grpc_socket.impl).peername()

cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
# TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()


cdef grpc_error* asyncio_socket_getsockname(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
raise NotImplemented()
"""Supplies sock_addr in add_socket_to_server."""
cdef grpc_resolved_address c_addr
socket = (<_AsyncioSocket>grpc_socket.impl)
if socket is None:
peer = ('0.0.0.0', 0)
else:
peer = socket.sockname()
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
# TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()


cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
raise NotImplemented()
(<_AsyncioSocket>grpc_socket.impl).listen()
return grpc_error_none()


def _asyncio_apply_socket_options(object s, so_reuse_port=False):
# TODO(https://github.com/grpc/grpc/issues/20667)
# Connects the so_reuse_port option to channel arguments
s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)


cdef grpc_error* asyncio_socket_bind(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
size_t len, int flags) with gil:
raise NotImplemented()
host, port = sockaddr_to_tuple(addr, len)
try:
ip = ipaddress.ip_address(host)
if isinstance(ip, ipaddress.IPv6Address):
family = native_socket.AF_INET6
else:
family = native_socket.AF_INET

socket = native_socket.socket(family=family)
_asyncio_apply_socket_options(socket)
socket.bind((host, port))
except IOError as io_error:
return socket_error("bind", str(io_error))
else:
aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket)
cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy
grpc_socket.impl = <void*>aio_socket
return grpc_error_none()


cdef void asyncio_socket_accept(
grpc_custom_socket* grpc_socket,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback accept_cb) with gil:
raise NotImplemented()
(<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb)


cdef grpc_error* asyncio_resolve(
char* host,
char* port,
grpc_resolved_addresses** res) with gil:
raise NotImplemented()
result = native_socket.getaddrinfo(host, port)
res[0] = tuples_to_resolvaddr(result)


cdef void asyncio_resolve_async(
Expand Down
24 changes: 22 additions & 2 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,40 @@

cdef class _AsyncioSocket:
cdef:
# Common attributes
grpc_custom_socket * _grpc_socket
grpc_custom_connect_callback _grpc_connect_cb
grpc_custom_read_callback _grpc_read_cb
object _reader
object _writer
object _task_read
object _task_connect
char * _read_buffer

# Client-side attributes
grpc_custom_connect_callback _grpc_connect_cb

# Server-side attributes
grpc_custom_accept_callback _grpc_accept_cb
grpc_custom_socket * _grpc_client_socket
object _server
object _py_socket
object _peername

@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket)
cdef _AsyncioSocket create(
grpc_custom_socket * grpc_socket,
object reader,
object writer)
@staticmethod
cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket)

cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb)
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb)
cdef bint is_connected(self)
cdef void close(self)

cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb)
cdef listen(self)
cdef tuple peername(self)
cdef tuple sockname(self)
64 changes: 60 additions & 4 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import socket
import socket as native_socket

from libc cimport string

Expand All @@ -26,11 +26,27 @@ cdef class _AsyncioSocket:
self._task_connect = None
self._task_read = None
self._read_buffer = NULL
self._server = None
self._py_socket = None
self._peername = None

@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket):
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
object reader,
object writer):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
socket._reader = reader
socket._writer = writer
if writer is not None:
socket._peername = writer.get_extra_info('peername')
return socket

@staticmethod
cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
socket._py_socket = py_socket
return socket

def __repr__(self):
Expand All @@ -52,7 +68,7 @@ cdef class _AsyncioSocket:
# gRPC default posix implementation disables nagle
# algorithm.
sock = self._writer.transport.get_extra_info('socket')
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)

self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
Expand Down Expand Up @@ -92,7 +108,11 @@ cdef class _AsyncioSocket:
grpc_socket_error("read {}".format(error_msg).encode())
)

cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb):
cdef void connect(self,
object host,
object port,
grpc_custom_connect_callback grpc_connect_cb):
assert not self._reader
assert not self._task_connect

self._task_connect = asyncio.ensure_future(
Expand Down Expand Up @@ -132,3 +152,39 @@ cdef class _AsyncioSocket:
cdef void close(self):
if self.is_connected():
self._writer.close()

def _new_connection_callback(self, object reader, object writer):
client_socket = _AsyncioSocket.create(
self._grpc_client_socket,
reader,
writer,
)

self._grpc_client_socket.impl = <void*>client_socket
cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy
# Accept callback expects to be called with:
# grpc_custom_socket: A grpc custom socket for server
# grpc_custom_socket: A grpc custom socket for client (with new Socket instance)
# grpc_error: An error object
self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none())

cdef listen(self):
async def create_asyncio_server():
self._server = await asyncio.start_server(
self._new_connection_callback,
sock=self._py_socket,
)

asyncio.get_event_loop().create_task(create_asyncio_server())

cdef accept(self,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback grpc_accept_cb):
self._grpc_client_socket = grpc_socket_client
self._grpc_accept_cb = grpc_accept_cb

cdef tuple peername(self):
return self._peername

cdef tuple sockname(self):
return self._py_socket.getsockname()
44 changes: 44 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cdef class _HandlerCallDetails:
cdef readonly str method
cdef readonly tuple invocation_metadata


cdef class RPCState:
cdef grpc_call* call,
cdef grpc_call_details details
cdef grpc_metadata_array request_metadata

cdef bytes method(self)


cdef enum AioServerStatus:
AIO_SERVER_STATUS_UNKNOWN
AIO_SERVER_STATUS_READY
AIO_SERVER_STATUS_RUNNING
AIO_SERVER_STATUS_STOPPED


cdef class _CallbackCompletionQueue:
cdef grpc_completion_queue *_cq
cdef grpc_completion_queue* c_ptr(self)


cdef class AioServer:
cdef Server _server
cdef _CallbackCompletionQueue _cq
cdef list _generic_handlers
cdef AioServerStatus _status
Loading

0 comments on commit 3c62c8a

Please sign in to comment.