Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #170 Add concurrent query support #1412

Open
wants to merge 2 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions evadb/interfaces/relational/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ async def execute_async(self, query: str):
"""
Send query to the EvaDB server.
"""
if self._pending_query:
raise SystemError(
"EvaDB does not support concurrent queries. \
Call fetch_all() to complete the pending query"
)
query = self._multiline_query_transformation(query)
self._connection._writer.write((query + "\n").encode())
await self._connection._writer.drain()
Expand Down
6 changes: 6 additions & 0 deletions evadb/server/command_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,9 @@ async def handle_request(evadb: EvaDBDatabase, client_writer, request_message):
client_writer.write(response_data)

return response

async def handle_requests(evadb_server):
while True:
# Remove request from the queue. If there are no requests, this call blocks.
evadb, client_writer, message = await evadb_server._request_queue.get()
await handle_request(evadb, client_writer, message)
30 changes: 23 additions & 7 deletions evadb/server/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def create_stdin_reader() -> StreamReader:


async def read_from_client_and_send_to_server(
stdin_reader: StreamReader, writer: StreamWriter, server_reader: StreamReader
stdin_reader: StreamReader, writer: StreamWriter, server_reader: StreamReader, client_request_queue: asyncio.Queue
):
VERSION = VERSION_DICT["VERSION"]
intro = f"evadb (v{VERSION})\nType 'EXIT;' to exit the client \n"
Expand All @@ -69,6 +69,10 @@ async def read_from_client_and_send_to_server(
connection = EvaDBConnection(None, server_reader, writer)
cursor = connection.cursor()

# Tasks to run concurrently to remove requests/responses that the server sends/recieves.
asyncio.create_task(handle_client_requests(cursor, client_request_queue))
asyncio.create_task(handle_server_response(cursor))

while True:
sys.stdout.write(prompt)
sys.stdout.flush()
Expand All @@ -79,11 +83,9 @@ async def read_from_client_and_send_to_server(
query = query.rstrip()
if query.upper() in ["EXIT", "QUIT"]:
return

await cursor.execute_async(query)
response = await cursor.fetch_all_async()
sys.stdout.write(str(response) + "\n")
sys.stdout.flush()

# Store the query inside the client_request_queue instead of immediately calling execute_async().
await client_request_queue.put(query)


async def start_cmd_client(host: str, port: int):
Expand All @@ -94,9 +96,10 @@ async def start_cmd_client(host: str, port: int):
reader, writer = None, None
reader, writer = await asyncio.open_connection(host, port)
stdin_reader = await create_stdin_reader()
client_request_queue = asyncio.Queue()

input_listener = asyncio.create_task(
read_from_client_and_send_to_server(stdin_reader, writer, reader)
read_from_client_and_send_to_server(stdin_reader, writer, reader, client_request_queue)
)

await asyncio.wait([input_listener], return_when=asyncio.FIRST_COMPLETED)
Expand All @@ -105,3 +108,16 @@ async def start_cmd_client(host: str, port: int):
if writer is not None:
writer.close()
# await writer.wait_closed()

async def handle_client_requests(cursor, client_request_queue: asyncio.Queue):
while True:
# Remove request from the queue. If there are no pending requests, this call blocks.
query = await client_request_queue.get()
await cursor.execute_async(query)

async def handle_server_response(cursor):
while True:
# Gets response from server. If there are current no responses, this call blocks.
response = await cursor.fetch_all_async()
sys.stdout.write(str(response) + "\n")
sys.stdout.flush()
14 changes: 11 additions & 3 deletions evadb/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@


class EvaServer:
_request_queue = None
"""
Receives messages and offloads them to another task for processing them.
"""
Expand All @@ -41,9 +42,13 @@ async def start_evadb_server(
hostname: hostname of the server
port: port of the server
"""

from pprint import pprint

pprint(f"EvaDB server started at host {host} and port {port}")

self._request_queue = asyncio.Queue()

self._evadb = init_evadb_instance(db_dir, host, port, custom_db_uri)

self._server = await asyncio.start_server(self.accept_client, host, port)
Expand All @@ -52,6 +57,10 @@ async def start_evadb_server(
mode = self._evadb.catalog().get_configuration_catalog_value("mode")
init_builtin_functions(self._evadb, mode=mode)

# task to handle requests in queue.
from evadb.server.command_handler import handle_requests
asyncio.create_task(handle_requests(self))

async with self._server:
await self._server.serve_forever()

Expand Down Expand Up @@ -93,9 +102,8 @@ async def handle_client(
return

logger.debug("Handle request")
from evadb.server.command_handler import handle_request

asyncio.create_task(handle_request(self._evadb, client_writer, message))
# When a new request comes in from a client, it to the queue.
await self._request_queue.put((self._evadb, client_writer, message))

except Exception as e:
logger.critical("Error reading from client.", exc_info=e)