Skip to content

Commit

Permalink
Node Intre-Service Communication Refactoring. Closes #204 (#222)
Browse files Browse the repository at this point in the history
* WIP(Node services): Making the node services all use the event bus. Now need to do some integration testing.

* WIP(worker comms service): Making the last service use the event bus implementation.

* WIP(finished worker comms): Finished making the WorkerCommService use the EventBus. Now need to do integration with the whole Node.

* WIP(finished Node): Finished making all node services use the eventbus. Need to finish testing Manager/Worker/Node comms to ensure that new changes didn't break communication.

* tests(Worker/Node): Fixing issues with Worker & Node comms. On-going effort.

* WIP(node step bug): Working on addressing a node step bug for data_chunk input argument passing.

* fix(node connection): The connection betweeh Pub and Sub across nodes has been fixed.

* fix(subscriber ZMQ socket closing): The main issue why threading and multiprocessing contexts conflicted is because of improper closing of ZMQ socket. Resolved.
  • Loading branch information
edavalosanaya authored Aug 4, 2023
1 parent e4212c8 commit 4cb8382
Show file tree
Hide file tree
Showing 39 changed files with 1,812 additions and 741 deletions.
2 changes: 1 addition & 1 deletion chimerapy/engine/manager/worker_handler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def _distribute_packages(self, packages_meta: List[Dict[str, Any]]) -> boo
host=worker_data.ip,
port=worker_data.port,
)
await client._send_file_async(
await client.async_send_file(
url=f"{self._get_worker_ip(worker_id)}/file/post",
sender_id=self.state.id,
filepath=zip_package_dst,
Expand Down
195 changes: 89 additions & 106 deletions chimerapy/engine/networking/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Built-in
from typing import Dict, Optional, Callable, Any
import os
import asyncio
import threading
Expand All @@ -12,16 +11,18 @@
import enum
import logging
import traceback
import atexit
import multiprocess as mp
from concurrent.futures import Future
from typing import Dict, Optional, Callable, Any, Union, List, Coroutine

# Third-party
import aiohttp

# Internal Imports
from chimerapy.engine import config
from ..utils import create_payload, async_waiting_for
from .async_loop_thread import AsyncLoopThread
from chimerapy.engine.utils import create_payload, async_waiting_for
from .enums import GENERAL_MESSAGE

# Logging
Expand All @@ -42,6 +43,7 @@ def __init__(
host: str,
port: int,
ws_handlers: Dict[enum.Enum, Callable] = {},
thread: Optional[AsyncLoopThread] = None,
parent_logger: Optional[logging.Logger] = None,
):

Expand All @@ -53,22 +55,27 @@ def __init__(
self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
self._session = None

# The EventLoop
if thread:
self._thread = thread
else:
self._thread = AsyncLoopThread()
self._thread.start()

self.futures: List[Future] = []

# State variables
self.running = threading.Event()
self.running.clear()
self.msg_processed_counter = 0
self._client_shutdown_complete = threading.Event()
self._client_shutdown_complete.clear()

# Create thread to accept async request
self._thread = AsyncLoopThread()
self._thread.start()
self.uuid_records: collections.deque[str] = collections.deque(maxlen=100)
self.tasks: List[asyncio.Task] = []

# Adding default client handlers
self.ws_handlers.update(
{
GENERAL_MESSAGE.OK.value: self._ok,
GENERAL_MESSAGE.SHUTDOWN.value: self._client_shutdown,
GENERAL_MESSAGE.SHUTDOWN.value: self.async_shutdown,
}
)

Expand All @@ -80,17 +87,34 @@ def __init__(
else:
self.logger = _logger.getLogger("chimerapy-engine-networking")

# Make sure to shutdown correctly
atexit.register(self.shutdown)

def __str__(self):
return f"<Client {self.id}>"

def setLogger(self, parent_logger: logging.Logger):
self.logger = _logger.fork(parent_logger, "client")

####################################################################
# Helper Function
####################################################################

def _exec_coro(self, coro: Coroutine) -> Future:
# Submitting the coroutine
future = self._thread.exec(coro)

# Saving the future for later use
self.futures.append(future)

return future

####################################################################
# Client WS Handlers
####################################################################

async def _ok(self, msg: Dict):
# self.logger.debug(f"{self}: received OK")
self.uuid_records.append(msg["data"]["uuid"])

####################################################################
Expand All @@ -102,6 +126,8 @@ async def _read_ws(self):

async for aiohttp_msg in self._ws:

# self.logger.debug(f"{self}: msg: {aiohttp_msg}")

# Tracking the number of messages processed
self.msg_processed_counter += 1

Expand All @@ -125,13 +151,6 @@ async def _read_ws(self):
await self._ws.close()
return None

async def _write_ws(self, msg: Dict):
await self._send_msg(**msg)

####################################################################
# Client Utilities
####################################################################

async def _send_msg(
self,
signal: enum.Enum,
Expand Down Expand Up @@ -164,7 +183,39 @@ async def _send_msg(
timeout=config.get("comms.timeout.ok"),
)

async def _send_file_async(
async def _register(self):

# First message should be the client registering to the Server
await self._send_msg(
signal=GENERAL_MESSAGE.CLIENT_REGISTER,
data={"client_id": self.id},
ok=True,
)

# self.logger.debug(f"{self}: registered!")

####################################################################
# Client Async Setup and Shutdown
####################################################################

async def async_connect(self):

# Reset
self.uuid_records.clear()

# Create the session
self._session = aiohttp.ClientSession()
self._ws = await self._session.ws_connect(f"http://{self.host}:{self.port}/ws")

# Create task to read
# self._thread.exec(self._read_ws())
task = asyncio.create_task(self._read_ws())
self.tasks.append(task)

# Register the client
await self._register()

async def async_send_file(
self, url: str, sender_id: str, filepath: pathlib.Path
) -> bool:

Expand All @@ -188,7 +239,7 @@ async def _send_file_async(

return True

async def _send_folder_async(self, sender_id: str, dir: pathlib.Path):
async def async_send_folder(self, sender_id: str, dir: pathlib.Path):

if not dir.is_dir() and not dir.exists():
self.logger.error(f"Cannot send non-existent dir: {dir}.")
Expand Down Expand Up @@ -232,65 +283,16 @@ async def _send_folder_async(self, sender_id: str, dir: pathlib.Path):
url = f"http://{self.host}:{self.port}/file/post"

# Then send the file
await self._send_file_async(url, sender_id, zip_file)
await self.async_send_file(url, sender_id, zip_file)

return True

####################################################################
# Client Async Setup and Shutdown
####################################################################

async def _register(self):

# First message should be the client registering to the Server
await self._send_msg(
signal=GENERAL_MESSAGE.CLIENT_REGISTER,
data={"client_id": self.id},
ok=True,
)

# Mark that client has connected
self._client_ready.set()

async def _main(self):

# Create record of message uuids
self.uuid_records = collections.deque(maxlen=100)

async with aiohttp.ClientSession() as session:
async with session.ws_connect(f"http://{self.host}:{self.port}/ws") as ws:

# Store the Client session
self._session = session
self._ws = ws

# Establish read and write
read_task = asyncio.create_task(self._read_ws())

# Register the client
await self._register()

# Continue executing them
await asyncio.gather(read_task)

# After the ws is closed, do the following
await self._client_shutdown()

async def _client_shutdown(self, msg: Dict = {}):

# Mark to stop and close things
self.running.clear()
async def async_shutdown(self, msg: Dict = {}):

if self._ws:
await asyncio.wait_for(self._ws.close(), timeout=2)
await asyncio.wait_for(self._ws.close(), timeout=5)
if self._session:
await asyncio.wait_for(self._session.close(), timeout=2)

self._client_shutdown_complete.set()

####################################################################
# Client ASync Lifecyle API
####################################################################
await asyncio.wait_for(self._session.close(), timeout=5)

async def async_send(self, signal: enum.Enum, data: Any, ok: bool = False) -> bool:

Expand All @@ -299,14 +301,16 @@ async def async_send(self, signal: enum.Enum, data: Any, ok: bool = False) -> bo

# Create msg container and execute writing coroutine
msg = {"signal": signal, "data": data, "msg_uuid": msg_uuid, "ok": ok}
await self._write_ws(msg)
await self._send_msg(**msg)

if ok:

await async_waiting_for(
success = await async_waiting_for(
lambda: msg_uuid in self.uuid_records,
timeout=config.get("comms.timeout.ok"),
)
if not success:
self.logger.warning(f"{self}: Timeout in OK")

return True

Expand All @@ -320,49 +324,28 @@ def send(self, signal: enum.Enum, data: Any, ok: bool = False) -> Future[bool]:
def send_file(self, sender_id: str, filepath: pathlib.Path) -> Future[bool]:
# Compose the url
url = f"http://{self.host}:{self.port}/file/post"
return self._thread.exec(self._send_file_async(url, sender_id, filepath))
return self._exec_coro(self.async_send_file(url, sender_id, filepath))

def send_folder(self, sender_id: str, dir: pathlib.Path) -> Future[bool]:

assert (
dir.is_dir() and dir.exists()
), f"Sending {dir} needs to be a folder that exists."

return self._thread.exec(self._send_folder_async(sender_id, dir))

def connect(self):

# Mark that the client is running
self.running.set()

# Create async loop in thread
self._client_ready = threading.Event()
self._client_ready.clear()

# Start async execution
self._thread.exec(self._main())

# Wait until client is ready
flag = self._client_ready.wait(timeout=config.get("comms.timeout.client-ready"))
if flag == 0:
self.shutdown()
raise TimeoutError(f"{self}: failed to connect, shutting down!")
return self._exec_coro(self.async_send_folder(sender_id, dir))

def shutdown(self):
def connect(self, blocking: bool = True) -> Union[bool, Future[bool]]:
future = self._exec_coro(self.async_connect())

if self.running.is_set():
if blocking:
return future.result(timeout=config.get("comms.timeout.client-ready"))

# Execute shutdown
self._thread.exec(self._client_shutdown())
return future

# Wait for it
if not self._client_shutdown_complete.wait(
timeout=config.get("comms.timeout.client-shutdown")
):
self.logger.warning(f"{self}: failed to gracefully shutdown")
def shutdown(self, blocking: bool = True) -> Union[bool, Future[bool]]:
future = self._exec_coro(self.async_shutdown())

# Stop threaded loop
self._thread.stop()
if blocking:
return future.result(timeout=config.get("comms.timeout.client-shutdown"))

def __del__(self):
self.shutdown()
return future
12 changes: 7 additions & 5 deletions chimerapy/engine/networking/publisher.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# Built-in Imports
import threading
import time

# Third-party Imports
import zmq

# Internal Imports
from chimerapy.engine import config
from ..utils import get_ip_address
from .data_chunk import DataChunk
from chimerapy.engine.utils import get_ip_address

# Logging
from chimerapy.engine import _logger
Expand All @@ -23,7 +21,7 @@ class Publisher:
def __init__(self):

# Storing state variables
self.port: int = -1
self.port: int = 0
self.host: str = get_ip_address()
self.running: bool = False
self._data_chunk: DataChunk = DataChunk()
Expand Down Expand Up @@ -59,7 +57,7 @@ def start(self):
self._zmq_context = zmq.Context()
self._zmq_socket = self._zmq_context.socket(zmq.PUB)
self.port = self._zmq_socket.bind_to_random_port(f"tcp://{self.host}")
time.sleep(config.get("comms.timeout.pub-delay"))
# time.sleep(config.get("comms.timeout.pub-delay"))

# Create send thread
self._ready = threading.Event()
Expand All @@ -72,3 +70,7 @@ def shutdown(self):
# Mark to stop
self.running = False
self._send_thread.join()

# Closing the socket
self._zmq_socket.close()
self._zmq_context.term()
Loading

0 comments on commit 4cb8382

Please sign in to comment.