Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky data collection. Closes #249 #255

Merged
merged 9 commits into from
Sep 15, 2023
30 changes: 28 additions & 2 deletions chimerapy/engine/eventbus/eventbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
from datetime import datetime
from collections import deque
from concurrent.futures import Future
from typing import Any, Generic, Type, Callable, Awaitable, Optional, Literal, TypeVar, Dict
from typing import (
Any,
Generic,
Type,
Callable,
Awaitable,
Optional,
Literal,
TypeVar,
Dict,
)

from aioreactive import AsyncObservable, AsyncObserver, AsyncSubject
from dataclasses import dataclass, field
Expand Down Expand Up @@ -32,6 +42,7 @@ def __init__(self, thread: Optional[AsyncLoopThread] = None):
self.thread = thread

# State information
self.awaitable_events: Dict[str, asyncio.Event] = {}
self.subscription_map: Dict[AsyncObserver, Any] = {}

####################################################################
Expand All @@ -43,19 +54,34 @@ async def asend(self, event: Event):
self._event_counts += 1
await self.stream.asend(event)

if event.type in self.awaitable_events:
self.latest_event = event
self.awaitable_events[event.type].set()
del self.awaitable_events[event.type]

async def asubscribe(self, observer: AsyncObserver):
self._sub_counts += 1
subscription = await self.stream.subscribe_async(observer)
self.subscription_map[observer] = subscription

async def aunsubscribe(self, observer: AsyncObserver):
if observer not in self.subscription_map:
raise RuntimeError("Trying to unsubscribe an Observer that is not subscribed")
raise RuntimeError(
"Trying to unsubscribe an Observer that is not subscribed"
)

self._sub_counts -= 1
subscription = self.subscription_map[observer]
await subscription.dispose_async()

async def await_event(self, event_type: str) -> Event:
if event_type not in self.awaitable_events:
self.awaitable_events[event_type] = asyncio.Event()

event_trigger = self.awaitable_events[event_type]
await event_trigger.wait()
return self.latest_event

####################################################################
## Sync
####################################################################
Expand Down
8 changes: 7 additions & 1 deletion chimerapy/engine/manager/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ class StartEvent:
...


@dataclass
class UpdateSendArchiveEvent: # update_send_archive
worker_id: str
success: bool


@dataclass
class WorkerRegisterEvent: # worker_register
worker_state: WorkerState
Expand All @@ -31,4 +37,4 @@ class DeregisterEntityEvent: # entity_deregister

@dataclass
class MoveTransferredFilesEvent: # move_transferred_files
unzip: bool
worker_state: WorkerState
32 changes: 27 additions & 5 deletions chimerapy/engine/manager/http_server_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
from ..networking.async_loop_thread import AsyncLoopThread
from ..networking import Server
from ..networking.enums import MANAGER_MESSAGE
from .events import WorkerRegisterEvent, WorkerDeregisterEvent
from .events import (
MoveTransferredFilesEvent,
WorkerRegisterEvent,
WorkerDeregisterEvent,
UpdateSendArchiveEvent,
)

logger = _logger.getLogger("chimerapy-engine")

Expand Down Expand Up @@ -48,9 +53,11 @@ def __init__(
id="Manager",
routes=[
# Worker API
web.get("/", self._home),
web.post("/workers/register", self._register_worker_route),
web.post("/workers/deregister", self._deregister_worker_route),
web.post("/workers/node_status", self._update_nodes_status),
web.post("/workers/send_archive", self._update_send_archive),
],
thread=self._thread,
)
Expand All @@ -68,6 +75,7 @@ def __init__(
),
"move_transferred_files": TypedObserver(
"move_transferred_files",
MoveTransferredFilesEvent,
on_asend=self.move_transferred_files,
handle_event="unpack",
),
Expand Down Expand Up @@ -120,8 +128,17 @@ def _future_flush(self):
except Exception:
logger.error(traceback.format_exc())

def move_transferred_files(self, unzip: bool) -> bool:
return self._server.move_transfer_files(self.state.logdir, unzip)
async def move_transferred_files(self, worker_state: WorkerState) -> bool:
return await self._server.move_transferred_files(
self.state.logdir, owner=worker_state.name, owner_id=worker_state.id
)

#####################################################################################
## Manager User Routes
#####################################################################################

async def _home(self, request: web.Request):
return web.Response(text="ChimeraPy Manager running!")

#####################################################################################
## Worker -> Manager Routes
Expand Down Expand Up @@ -167,9 +184,14 @@ async def _update_nodes_status(self, request: web.Request):
if worker_state.id in self.state.workers:
update_dataclass(self.state.workers[worker_state.id], worker_state)
else:
logger.error(f"{self}: non-registered Worker update: {worker_state.id}")
# logger.debug(f"{self}: Nodes status update to: {self.state.workers}")
logger.warning(f"{self}: non-registered Worker update: {worker_state.id}")

return web.HTTPOk()

async def _update_send_archive(self, request: web.Request):
msg = await request.json()
event_data = UpdateSendArchiveEvent(**msg)
await self.eventbus.asend(Event("update_send_archive", event_data))
return web.HTTPOk()

#####################################################################################
Expand Down
11 changes: 4 additions & 7 deletions chimerapy/engine/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ async def async_request_registered_method(
async def async_stop(self) -> bool:
return await self.worker_handler.stop()

async def async_collect(self, unzip: bool = True) -> bool:
return await self.worker_handler.collect(unzip)
async def async_collect(self) -> bool:
return await self.worker_handler.collect()

async def async_reset(self, keep_workers: bool = True):
return await self.worker_handler.reset(keep_workers)
Expand Down Expand Up @@ -454,20 +454,17 @@ def stop(self) -> Future[bool]:
"""
return self._exec_coro(self.async_stop())

def collect(self, unzip: bool = True) -> Future[bool]:
def collect(self) -> Future[bool]:
"""Collect data from the Workers

First, we wait until all the Nodes have finished save their data.\
Then, manager request that Nodes' from the Workers.

Args:
unzip (bool): Should the .zip archives be extracted.

Returns:
Future[bool]: Future of success in collect data from Workers

"""
return self._exec_coro(self.async_collect(unzip))
return self._exec_coro(self.async_collect())

def reset(
self, keep_workers: bool = True, blocking: bool = True
Expand Down
106 changes: 76 additions & 30 deletions chimerapy/engine/manager/worker_handler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from chimerapy.engine import config
from chimerapy.engine import _logger
from ..utils import async_waiting_for
from ..data_protocols import NodePubTable
from ..node import NodeConfig
from ..networking import Client, DataChunk
Expand All @@ -27,6 +28,7 @@
RegisterEntityEvent,
DeregisterEntityEvent,
MoveTransferredFilesEvent,
UpdateSendArchiveEvent,
)

logger = _logger.getLogger("chimerapy-engine")
Expand All @@ -46,6 +48,7 @@ def __init__(self, name: str, eventbus: EventBus, state: ManagerState):
self.worker_graph_map: Dict = {}
self.commitable_graph: bool = False
self.node_pub_table = NodePubTable()
self.collected_workers: Dict[str, bool] = {}

# Also create a tempfolder to store any miscellaneous files and folders
self.tempfolder = pathlib.Path(tempfile.mkdtemp())
Expand All @@ -67,6 +70,12 @@ def __init__(self, name: str, eventbus: EventBus, state: ManagerState):
event_data_cls=WorkerDeregisterEvent,
handle_event="unpack",
),
"update_send_archive": TypedObserver(
"update_send_archive",
on_asend=self.update_send_archive,
event_data_cls=UpdateSendArchiveEvent,
handle_event="unpack",
),
}
for ob in self.observers.values():
self.eventbus.subscribe(ob).result(timeout=1)
Expand Down Expand Up @@ -126,25 +135,28 @@ async def _register_worker(self, worker_state: WorkerState) -> bool:

return True

async def _deregister_worker(self, worker_id: str) -> bool:
async def _deregister_worker(self, worker_state: WorkerState) -> bool:

# Deregister entity from logging
await self.eventbus.asend(
Event("entity_deregister", DeregisterEntityEvent(worker_id=worker_id))
Event("entity_deregister", DeregisterEntityEvent(worker_id=worker_state.id))
)

if worker_id in self.state.workers:
state = self.state.workers[worker_id]
if worker_state.id in self.state.workers:
state = self.state.workers[worker_state.id]
logger.info(
f"Manager deregistered <Worker id={worker_id} name={state.name}> \
from {state.ip}"
f"Manager deregistered <Worker id={worker_state.id} name={state.name}> "
f"from {state.ip}"
)
del self.state.workers[worker_id]
del self.state.workers[worker_state.id]

return True

return False

async def update_send_archive(self, worker_id: str, success: bool):
self.collected_workers[worker_id] = success

def _register_graph(self, graph: Graph):
"""Verifying that a Graph is valid, that is a DAG.

Expand Down Expand Up @@ -590,6 +602,47 @@ async def _setup_p2p_connections(self) -> bool:

return all(results)

async def _single_worker_collect(self, worker_id: str) -> bool:

# Just requesting for the collection to start
data = {"path": str(self.state.logdir)}
async with aiohttp.ClientSession() as client:
async with client.post(
f"{self._get_worker_ip(worker_id)}/nodes/collect",
data=json.dumps(data),
) as resp:

if not resp.ok:
logger.error(
f"{self}: Collection failed, <Worker {worker_id}> "
"responded {resp.ok} to collect request"
)
return False

# Now we have to wait until worker says they finished transferring
await async_waiting_for(condition=lambda: worker_id in self.collected_workers)
success = self.collected_workers[worker_id]
if not success:
logger.error(
f"{self}: Collection failed, <Worker {worker_id}> "
"never updated on archival completion"
)

# Move files to their destination
try:
await self.eventbus.asend(
Event(
"move_transferred_files",
MoveTransferredFilesEvent(
worker_state=self.state.workers[worker_id]
),
)
)
except Exception:
logger.error(traceback.format_exc())

return success

####################################################################
## Front-facing ASync API
####################################################################
Expand Down Expand Up @@ -753,31 +806,24 @@ async def stop(self) -> bool:

return success

async def collect(self, unzip: bool = True) -> bool:
async def collect(self) -> bool:

# Then tell them to send the data to the Manager
success = await self._broadcast_request(
htype="post",
route="/nodes/collect",
data={"path": str(self.state.logdir)},
timeout=None,
)
await asyncio.sleep(1)
# Clear
self.collected_workers.clear()

if success:
try:
await self.eventbus.asend(Event("save_meta"))
await self.eventbus.asend(
Event(
"move_transferred_files", MoveTransferredFilesEvent(unzip=unzip)
)
)
except Exception:
logger.error(traceback.format_exc())
# Request all workers
coros: List[Coroutine] = []
for worker_id in self.state.workers:
coros.append(self._single_worker_collect(worker_id))

# logger.info(f"{self}: finished collect")
try:
results = await asyncio.gather(*coros)
except Exception:
logger.error(traceback.format_exc())
return False

return success
await self.eventbus.asend(Event("save_meta"))
return all(results)

async def reset(self, keep_workers: bool = True):

Expand All @@ -796,8 +842,8 @@ async def reset(self, keep_workers: bool = True):

# If not keep Workers, then deregister all
if not keep_workers:
for worker_id in self.state.workers:
await self._deregister_worker(worker_id)
for worker_state in self.state.workers.values():
await self._deregister_worker(worker_state)

# Update variable data
self.node_pub_table = NodePubTable()
Expand Down
Loading
Loading