From 3fbbc270e3940cea369c4de7862014064584590b Mon Sep 17 00:00:00 2001 From: kumeagidi <54588820+kumeagidi@users.noreply.github.com> Date: Sat, 25 Nov 2023 17:28:44 -0500 Subject: [PATCH 1/2] add concurrent query support --- evadb/interfaces/relational/db.py | 5 ----- evadb/server/command_handler.py | 6 ++++++ evadb/server/interpreter.py | 30 +++++++++++++++++++++++------- evadb/server/server.py | 14 +++++++++++--- 4 files changed, 40 insertions(+), 15 deletions(-) diff --git a/evadb/interfaces/relational/db.py b/evadb/interfaces/relational/db.py index 714593a8a8..a0f6367e69 100644 --- a/evadb/interfaces/relational/db.py +++ b/evadb/interfaces/relational/db.py @@ -112,11 +112,6 @@ async def execute_async(self, query: str): """ Send query to the EvaDB server. """ - if self._pending_query: - raise SystemError( - "EvaDB does not support concurrent queries. \ - Call fetch_all() to complete the pending query" - ) query = self._multiline_query_transformation(query) self._connection._writer.write((query + "\n").encode()) await self._connection._writer.drain() diff --git a/evadb/server/command_handler.py b/evadb/server/command_handler.py index f8d520c998..b386cd2b77 100644 --- a/evadb/server/command_handler.py +++ b/evadb/server/command_handler.py @@ -144,3 +144,9 @@ async def handle_request(evadb: EvaDBDatabase, client_writer, request_message): client_writer.write(response_data) return response + +async def handle_requests(evadb_server): + while True: + # Remove request from the queue. If there are no requests, this call blocks. + evadb, client_writer, message = await evadb_server._request_queue.get() + await handle_request(evadb, client_writer, message) diff --git a/evadb/server/interpreter.py b/evadb/server/interpreter.py index 61c601bec1..f18b4b920d 100644 --- a/evadb/server/interpreter.py +++ b/evadb/server/interpreter.py @@ -53,7 +53,7 @@ async def create_stdin_reader() -> StreamReader: async def read_from_client_and_send_to_server( - stdin_reader: StreamReader, writer: StreamWriter, server_reader: StreamReader + stdin_reader: StreamReader, writer: StreamWriter, server_reader: StreamReader, client_request_queue: asyncio.Queue ): VERSION = VERSION_DICT["VERSION"] intro = f"evadb (v{VERSION})\nType 'EXIT;' to exit the client \n" @@ -69,6 +69,10 @@ async def read_from_client_and_send_to_server( connection = EvaDBConnection(None, server_reader, writer) cursor = connection.cursor() + # Tasks to run concurrently to remove requests/responses that the server sends/recieves. + client_request = asyncio.create_task(handle_client_requests(cursor, client_request_queue)) + server_response = asyncio.create_task(handle_server_response(cursor)) + while True: sys.stdout.write(prompt) sys.stdout.flush() @@ -79,11 +83,9 @@ async def read_from_client_and_send_to_server( query = query.rstrip() if query.upper() in ["EXIT", "QUIT"]: return - - await cursor.execute_async(query) - response = await cursor.fetch_all_async() - sys.stdout.write(str(response) + "\n") - sys.stdout.flush() + + # Store the query inside the client_request_queue instead of immediately calling execute_async(). + await client_request_queue.put(query) async def start_cmd_client(host: str, port: int): @@ -94,9 +96,10 @@ async def start_cmd_client(host: str, port: int): reader, writer = None, None reader, writer = await asyncio.open_connection(host, port) stdin_reader = await create_stdin_reader() + client_request_queue = asyncio.Queue() input_listener = asyncio.create_task( - read_from_client_and_send_to_server(stdin_reader, writer, reader) + read_from_client_and_send_to_server(stdin_reader, writer, reader, client_request_queue) ) await asyncio.wait([input_listener], return_when=asyncio.FIRST_COMPLETED) @@ -105,3 +108,16 @@ async def start_cmd_client(host: str, port: int): if writer is not None: writer.close() # await writer.wait_closed() + +async def handle_client_requests(cursor, client_request_queue: asyncio.Queue): + while True: + # Remove request from the queue. If there are no pending requests, this call blocks. + query = await client_request_queue.get() + await cursor.execute_async(query) + +async def handle_server_response(cursor): + while True: + # Gets response from server. If there are current no responses, this call blocks. + response = await cursor.fetch_all_async() + sys.stdout.write(str(response) + "\n") + sys.stdout.flush() diff --git a/evadb/server/server.py b/evadb/server/server.py index 9f33dced0d..10a7bd6b38 100644 --- a/evadb/server/server.py +++ b/evadb/server/server.py @@ -22,6 +22,7 @@ class EvaServer: + _request_queue = None """ Receives messages and offloads them to another task for processing them. """ @@ -41,9 +42,13 @@ async def start_evadb_server( hostname: hostname of the server port: port of the server """ + from pprint import pprint pprint(f"EvaDB server started at host {host} and port {port}") + + self._request_queue = asyncio.Queue() + self._evadb = init_evadb_instance(db_dir, host, port, custom_db_uri) self._server = await asyncio.start_server(self.accept_client, host, port) @@ -52,6 +57,10 @@ async def start_evadb_server( mode = self._evadb.catalog().get_configuration_catalog_value("mode") init_builtin_functions(self._evadb, mode=mode) + # task to handle requests in queue. + from evadb.server.command_handler import handle_requests + asyncio.create_task(handle_requests(self)) + async with self._server: await self._server.serve_forever() @@ -93,9 +102,8 @@ async def handle_client( return logger.debug("Handle request") - from evadb.server.command_handler import handle_request - - asyncio.create_task(handle_request(self._evadb, client_writer, message)) + # When a new request comes in from a client, it to the queue. + await self._request_queue.put((self._evadb, client_writer, message)) except Exception as e: logger.critical("Error reading from client.", exc_info=e) From d7849fb0d0ddc29a00aa67ce73fa1e93d8dd4ea4 Mon Sep 17 00:00:00 2001 From: kumeagidi <54588820+kumeagidi@users.noreply.github.com> Date: Sat, 25 Nov 2023 17:42:24 -0500 Subject: [PATCH 2/2] fix linting --- evadb/server/interpreter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/evadb/server/interpreter.py b/evadb/server/interpreter.py index f18b4b920d..c15e0bf343 100644 --- a/evadb/server/interpreter.py +++ b/evadb/server/interpreter.py @@ -70,8 +70,8 @@ async def read_from_client_and_send_to_server( cursor = connection.cursor() # Tasks to run concurrently to remove requests/responses that the server sends/recieves. - client_request = asyncio.create_task(handle_client_requests(cursor, client_request_queue)) - server_response = asyncio.create_task(handle_server_response(cursor)) + asyncio.create_task(handle_client_requests(cursor, client_request_queue)) + asyncio.create_task(handle_server_response(cursor)) while True: sys.stdout.write(prompt)