Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(framework) Remove asyncio from Backend definitions #3469

Merged
merged 63 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
57032f2
wip
jafermarq Apr 22, 2024
b885f22
w/ previous
jafermarq Apr 22, 2024
33ada3e
default `create_run`
jafermarq Apr 22, 2024
54539d9
w/ previous
jafermarq Apr 22, 2024
ea946b7
Merge branch 'main' into in-memory-driver
jafermarq Apr 23, 2024
83ff757
Merge branch 'main' into in-memory-driver
jafermarq Apr 24, 2024
13b0e7b
init
jafermarq Apr 25, 2024
242883e
clientdriverproxytests good
jafermarq Apr 25, 2024
6138968
unfusing
jafermarq Apr 25, 2024
0f3ab98
Merge branch 'main' into in-memory-driver
jafermarq Apr 25, 2024
a8ac6ff
Merge branch 'fuse-grpc-driver-and-helper' into in-memory-driver
jafermarq Apr 25, 2024
49ea5df
flower-simulation uses `InMemoryDriver`
jafermarq Apr 25, 2024
8e808f8
Merge branch 'main' into fuse-grpc-driver-and-helper
jafermarq Apr 25, 2024
c2b2f1e
Merge branch 'fuse-grpc-driver-and-helper' into in-memory-driver
jafermarq Apr 25, 2024
46d0268
Merge branch 'main' into in-memory-driver
jafermarq Apr 29, 2024
91443ce
revert and merge w/ main
jafermarq Apr 29, 2024
91a7b31
use InMemoryDriver
jafermarq Apr 29, 2024
e63fd1c
Merge branch 'main' into in-memory-driver
jafermarq Apr 29, 2024
6455809
Merge branch 'main' into in-memory-driver
jafermarq Apr 30, 2024
b2dcc86
updated /w fab
jafermarq Apr 30, 2024
966b55a
wip
jafermarq Apr 30, 2024
2302dc3
completed unittest
jafermarq Apr 30, 2024
944d8f6
Merge branch 'main' into in-memory-driver
jafermarq Apr 30, 2024
a150844
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 1, 2024
68e77c6
Merge branch 'main' into in-memory-driver
jafermarq May 1, 2024
7e211c6
Merge branch 'main' into in-memory-driver
panh99 May 1, 2024
ece1774
ensure tasks are deleted after pull
jafermarq May 1, 2024
7eb41c5
w/ previous
jafermarq May 1, 2024
9e6fc95
fix
jafermarq May 1, 2024
da9ed0b
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 1, 2024
c49df89
Merge branch 'main' into in-memory-driver
jafermarq May 6, 2024
e94874e
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 6, 2024
f8963d3
state-agnostic driver definition
jafermarq May 6, 2024
f4b244c
minor tweak
jafermarq May 6, 2024
5768b08
Merge branch 'main' into in-memory-driver
jafermarq May 6, 2024
96b4c6a
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 6, 2024
b3e4407
Merge branch 'main' into in-memory-driver
jafermarq May 8, 2024
f1a59c7
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 8, 2024
76eab14
Merge branch 'main' into in-memory-driver
jafermarq May 8, 2024
9b1599d
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 8, 2024
a2c2239
Merge branch 'main' into in-memory-driver
jafermarq May 16, 2024
202e2f4
Merge branch 'main' into in-memory-driver
jafermarq May 18, 2024
69349df
Merge branch 'in-memory-driver' into use-inmemory-driver-in-simulations
jafermarq May 18, 2024
b08ba2c
Merge branch 'main' into use-inmemory-driver-in-simulations
danieljanes May 19, 2024
12be6fe
Merge branch 'main' into use-inmemory-driver-in-simulations
jafermarq May 19, 2024
5f803e6
init
jafermarq May 19, 2024
f5d7e87
init
jafermarq May 19, 2024
1427958
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jun 10, 2024
2a04fe7
Merge branch 'simplify-sim-double-queue' into simplify-sim-no-asyncio…
jafermarq Jun 10, 2024
5548ce2
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jun 11, 2024
438154c
Merge branch 'simplify-sim-double-queue' into simplify-sim-no-asyncio…
jafermarq Jun 11, 2024
b0a47bd
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jun 14, 2024
aafb61b
Merge branch 'main' into simplify-sim-double-queue
jafermarq Jul 2, 2024
5c0ed99
merge
jafermarq Jul 2, 2024
39f7ba1
fix
jafermarq Jul 2, 2024
723318a
Merge branch 'simplify-sim-double-queue' into simplify-sim-no-asyncio…
jafermarq Jul 2, 2024
cb6c647
remove asyncio cleanup in tests
jafermarq Jul 2, 2024
02a11b9
Merge branch 'main' into simplify-sim-no-asyncio-backends
danieljanes Jul 10, 2024
02c5d1d
refactor(framework) Remove `asyncio` from core Simulation Engine (#3470)
jafermarq Jul 10, 2024
84727f5
Merge branch 'main' into simplify-sim-no-asyncio-backends
danieljanes Jul 10, 2024
cc655d8
Merge branch 'main' into simplify-sim-no-asyncio-backends
jafermarq Jul 10, 2024
f5d2119
py3.8 compatible
jafermarq Jul 10, 2024
716b37b
Merge branch 'main' into simplify-sim-no-asyncio-backends
jafermarq Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/py/flwr/server/superlink/fleet/vce/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self, backend_config: BackendConfig, work_dir: str) -> None:
"""Construct a backend."""

@abstractmethod
async def build(self) -> None:
"""Build backend asynchronously.
def build(self) -> None:
"""Build backend.

Different components need to be in place before workers in a backend are ready
to accept jobs. When this method finishes executing, the backend should be fully
Expand All @@ -54,11 +54,11 @@ def is_worker_idle(self) -> bool:
"""Report whether a backend worker is idle and can therefore run a ClientApp."""

@abstractmethod
async def terminate(self) -> None:
def terminate(self) -> None:
"""Terminate backend."""

@abstractmethod
async def process_message(
def process_message(
self,
app: Callable[[], ClientApp],
message: Message,
Expand Down
17 changes: 8 additions & 9 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ def is_worker_idle(self) -> bool:
"""Report whether the pool has idle actors."""
return self.pool.is_actor_available()

async def build(self) -> None:
def build(self) -> None:
"""Build pool of Ray actors that this backend will submit jobs to."""
await self.pool.add_actors_to_pool(self.pool.actors_capacity)
self.pool.add_actors_to_pool(self.pool.actors_capacity)
log(DEBUG, "Constructed ActorPool with: %i actors", self.pool.num_actors)

async def process_message(
def process_message(
self,
app: Callable[[], ClientApp],
message: Message,
Expand All @@ -172,17 +172,16 @@ async def process_message(

try:
# Submit a task to the pool
future = await self.pool.submit(
future = self.pool.submit(
lambda a, a_fn, mssg, cid, state: a.run.remote(a_fn, mssg, cid, state),
(app, message, str(partition_id), context),
)

await future
# Fetch result
(
out_mssg,
updated_context,
) = await self.pool.fetch_result_and_return_actor_to_pool(future)
) = self.pool.fetch_result_and_return_actor_to_pool(future)

return out_mssg, updated_context

Expand All @@ -193,11 +192,11 @@ async def process_message(
self.__class__.__name__,
)
# add actor back into pool
await self.pool.add_actor_back_to_pool(future)
self.pool.add_actor_back_to_pool(future)
raise ex

async def terminate(self) -> None:
def terminate(self) -> None:
"""Terminate all actors in actor pool."""
await self.pool.terminate_all_actors()
self.pool.terminate_all_actors()
ray.shutdown()
log(DEBUG, "Terminated %s", self.__class__.__name__)
31 changes: 11 additions & 20 deletions src/py/flwr/server/superlink/fleet/vce/backend/raybackend_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
# ==============================================================================
"""Test for Ray backend for the Fleet API using the Simulation Engine."""

import asyncio
from math import pi
from pathlib import Path
from typing import Callable, Dict, Optional, Tuple, Union
from unittest import IsolatedAsyncioTestCase
from unittest import TestCase

import ray

Expand Down Expand Up @@ -84,18 +83,18 @@ def _load_app() -> ClientApp:
return _load_app


async def backend_build_process_and_termination(
def backend_build_process_and_termination(
backend: RayBackend,
process_args: Optional[Tuple[Callable[[], ClientApp], Message, Context]] = None,
) -> Union[Tuple[Message, Context], None]:
"""Build, process job and terminate RayBackend."""
await backend.build()
backend.build()
to_return = None

if process_args:
to_return = await backend.process_message(*process_args)
to_return = backend.process_message(*process_args)

await backend.terminate()
backend.terminate()

return to_return

Expand Down Expand Up @@ -129,20 +128,18 @@ def _create_message_and_context() -> Tuple[Message, Context, float]:
return message, context, expected_output


class AsyncTestRayBackend(IsolatedAsyncioTestCase):
"""A basic class that allows runnig multliple asyncio tests."""
class TestRayBackend(TestCase):
"""A basic class that allows runnig multliple tests."""

async def on_cleanup(self) -> None:
def doCleanups(self) -> None:
"""Ensure Ray has shutdown."""
if ray.is_initialized():
ray.shutdown()

def test_backend_creation_and_termination(self) -> None:
"""Test creation of RayBackend and its termination."""
backend = RayBackend(backend_config={}, work_dir="")
asyncio.run(
backend_build_process_and_termination(backend=backend, process_args=None)
)
backend_build_process_and_termination(backend=backend, process_args=None)

def test_backend_creation_submit_and_termination(
self,
Expand All @@ -157,10 +154,8 @@ def test_backend_creation_submit_and_termination(

message, context, expected_output = _create_message_and_context()

res = asyncio.run(
backend_build_process_and_termination(
backend=backend, process_args=(client_app_callable, message, context)
)
res = backend_build_process_and_termination(
backend=backend, process_args=(client_app_callable, message, context)
)

if res is None:
Expand Down Expand Up @@ -189,7 +184,6 @@ def test_backend_creation_submit_and_termination_non_existing_client_app(
self.test_backend_creation_submit_and_termination(
client_app_loader=_load_from_module("a_non_existing_module:app")
)
self.addAsyncCleanup(self.on_cleanup)

def test_backend_creation_submit_and_termination_existing_client_app(
self,
Expand Down Expand Up @@ -217,7 +211,6 @@ def test_backend_creation_submit_and_termination_existing_client_app_unsetworkdi
client_app_loader=_load_from_module("raybackend_test:client_app"),
workdir="/?&%$^#%@$!",
)
self.addAsyncCleanup(self.on_cleanup)

def test_backend_creation_with_init_arguments(self) -> None:
"""Testing whether init args are properly parsed to Ray."""
Expand Down Expand Up @@ -248,5 +241,3 @@ def test_backend_creation_with_init_arguments(self) -> None:
nodes = ray.nodes()

assert nodes[0]["Resources"]["CPU"] == backend_config_2["init_args"]["num_cpus"]

self.addAsyncCleanup(self.on_cleanup)
Loading