Skip to content

Commit

Permalink
[Fix] Update docker image, libs + Increase efficiency (#9)
Browse files Browse the repository at this point in the history
* update image and libs

* Include flushing after training

* Include models before initialization and extend grace time

* Improve the deployment and saving config

* Improve the forward and increase config reloading
  • Loading branch information
enriquetomasmb authored Aug 1, 2024
1 parent 9fbda6a commit 1b7b959
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM nvidia/cuda:12.1.0-base-ubuntu22.04
FROM nvidia/cuda:12.4.1-cudnn-runtime-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

Expand Down
4 changes: 2 additions & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Sphinx==7.4.0
sphinx-autoapi==3.1.2
Sphinx==7.4.7
sphinx-autoapi==3.2.1
sphinx-book-theme==1.1.3
5 changes: 5 additions & 0 deletions nebula/addons/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, config, trainer, cm: "CommunicationsManager"):
self.grace_time = self.config.participant["reporter_args"]["grace_time_reporter"]
self.data_queue = asyncio.Queue()
self.url = f'http://{self.config.participant["scenario_args"]["controller"]}/nebula/dashboard/{self.config.participant["scenario_args"]["name"]}/node/update'
self.counter = 0

async def enqueue_data(self, name, value):
await self.data_queue.put((name, value))
Expand All @@ -36,6 +37,10 @@ async def run_reporter(self):
await self.__report_status_to_controller()
await self.__report_data_queue()
await self.__report_resources()
self.counter += 1
if self.counter % 50 == 0:
logging.info(f"Reloading config file...")
self.cm.engine.config.reload_config_file()
await asyncio.sleep(self.frequency)

async def report_scenario_finished(self):
Expand Down
3 changes: 0 additions & 3 deletions nebula/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,6 @@ async def start_communications(self):
logging.info(f"💤 Cold start time: {self.config.participant['misc_args']['grace_time_connection']} seconds before connecting to the network")
await asyncio.sleep(self.config.participant["misc_args"]["grace_time_connection"])
await self.cm.start()
if self.config.participant["scenario_args"]["controller"] == "nebula-frontend":
await self.cm.register()
await self.cm.wait_for_controller()
initial_neighbors = self.config.participant["network_args"]["neighbors"].split()
for i in initial_neighbors:
addr = f"{i.split(':')[0]}:{i.split(':')[1]}"
Expand Down
21 changes: 19 additions & 2 deletions nebula/core/network/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ async def handle_incoming_message(self, data, addr_from):
await self.handle_control_message(source, message_wrapper.control_message)
elif message_wrapper.HasField("federation_message"):
if await self.include_received_message_hash(hashlib.md5(data).hexdigest()):
await self.forwarder.forward(data, addr_from=addr_from)
if self.config.participant["device_args"]["proxy"] or message_wrapper.federation_message.action == nebula_pb2.FederationMessage.Action.Value("FEDERATION_START"):
await self.forwarder.forward(data, addr_from=addr_from)
await self.handle_federation_message(source, message_wrapper.federation_message)
elif message_wrapper.HasField("model_message"):
if await self.include_received_message_hash(hashlib.md5(data).hexdigest()):
Expand Down Expand Up @@ -170,7 +171,7 @@ async def handle_model_message(self, source, message):
if message.round != current_round and message.round != -1:
logging.info(f"❗️ handle_model_message | Received a model from a different round | Model round: {message.round} | Current round: {current_round}")
if message.round > current_round:
logging.info(f"🤖 handle_model_message | Saving model from {source} for future round")
logging.info(f"🤖 handle_model_message | Saving model from {source} for future round {message.round}")
await self.engine.aggregator.include_next_model_in_buffer(
message.parameters,
message.weight,
Expand Down Expand Up @@ -211,6 +212,13 @@ async def handle_model_message(self, source, message):
else:
if message.round != -1:
# Be sure that the model message is from the initialization round (round = -1)
logging.info(f"🤖 handle_model_message | Saving model from {source} for future round {message.round}")
await self.engine.aggregator.include_next_model_in_buffer(
message.parameters,
message.weight,
source=source,
round=message.round,
)
return
logging.info(f"🤖 handle_model_message | Initializing model (executed by {source})")
try:
Expand All @@ -232,6 +240,15 @@ async def handle_model_message(self, source, message):

else:
logging.info(f"🤖 handle_model_message | Tried to add a model while learning is not running")
if message.round != -1:
# Be sure that the model message is from the initialization round (round = -1)
logging.info(f"🤖 handle_model_message | Saving model from {source} for future round {message.round}")
await self.engine.aggregator.include_next_model_in_buffer(
message.parameters,
message.weight,
source=source,
round=message.round,
)
return

async def handle_connection_message(self, source, message):
Expand Down
10 changes: 7 additions & 3 deletions nebula/core/network/connection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import base64
import gc
import logging
import time
from geopy import distance
Expand Down Expand Up @@ -197,7 +198,7 @@ async def send(self, data, pb=True, encoding_type="utf-8", compression="none"):
else:
data_to_send = data_prefix + encoded_data + self.EOT_CHAR

chunk_size = 1024 * 1024 * 1024
chunk_size = 100 * 1024 * 1024 # 100 MB
total_size = len(data_to_send)

for i in range(0, total_size, chunk_size):
Expand Down Expand Up @@ -234,12 +235,15 @@ async def retrieve_message(self, message):
except Exception as e:
logging.error(f"❗️ Error retrieving message: {e}")
return None
finally:
del message
gc.collect()

async def handle_incoming_message(self):
try:
buffer = bytearray()
chunk_size = 1024 * 1024 * 1024
max_buffer_size = 2 * 1024 * 1024 * 1024 # 2 GB
chunk_size = 100 * 1024 * 1024 # 100 MB
max_buffer_size = 1024 * 1024 * 1024 # 1 GB
while True:
try:
chunk = await self.reader.read(chunk_size)
Expand Down
6 changes: 6 additions & 0 deletions nebula/core/training/lightning.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gc
import logging
from collections import OrderedDict
import random
Expand Down Expand Up @@ -146,6 +147,7 @@ def train(self):
try:
self.create_trainer()
self.__trainer.fit(self.model, self.data)
self.__trainer = None
except Exception as e:
logging.error(f"Error training model: {e}")
logging.error(traceback.format_exc())
Expand All @@ -154,6 +156,7 @@ def test(self):
try:
self.create_trainer()
self.__trainer.test(self.model, self.data, verbose=True)
self.__trainer = None
except Exception as e:
logging.error(f"Error testing model: {e}")
logging.error(traceback.format_exc())
Expand All @@ -170,6 +173,9 @@ def on_round_end(self):
self._logger.global_step = self._logger.global_step + self._logger.local_step
self._logger.local_step = 0
self.round += 1
logging.info("Flushing memory cache at the end of round...")
torch.cuda.empty_cache()
gc.collect()
pass

def on_learning_cycle_end(self):
Expand Down
1 change: 1 addition & 0 deletions nebula/frontend/config/participant.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"device_args": {
"uid": "",
"idx": "",
"docker_id": "",
"name": "",
"username": "pi",
"password": "pi",
Expand Down
14 changes: 7 additions & 7 deletions nebula/frontend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
setuptools==70.3.0
setuptools==71.1.0
wheel==0.43.0
protobuf==4.25.3
# tensorboard==2.17.0
tb-nightly==2.18.0a20240711
tensorboardx==2.6.2.2
pandas==2.2.2
fastapi[all]==0.111.0
uvicorn==0.30.1
fastapi[all]==0.111.1
uvicorn==0.30.3
jinja2==3.1.4
pytest==8.2.2
pytest==8.3.2
matplotlib==3.9.1
plotly==5.22.0
plotly==5.23.0
python-dotenv==1.0.1
networkx==3.3
requests==2.32.3
ansi2html==1.9.2
gunicorn==22.0.0
geopy==2.4.1
cryptography==42.0.8
pyOpenSSL==24.1.0
cryptography==43.0.0
pyOpenSSL==24.2.1
pycryptodome==3.20.0
pyinstrument==4.6.2
cffi==1.16.0
Expand Down
14 changes: 7 additions & 7 deletions nebula/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@ async-timeout==4.0.3
netifaces==0.11.0
tcconfig==0.28.1
geopy==2.4.1
numpy==2.0.0
torch==2.3.1
torchvision==0.18.1
numpy==2.0.1
torch==2.4.0
torchvision==0.19.0
torchtext
torchdata==0.7.1
torchmetrics==1.4.0
lightning==2.3.3
plotly==5.22.0
plotly==5.23.0
# tensorboard==2.17.0
tb-nightly==2.18.0a20240711
tensorboardx==2.6.2.2
pytest==8.2.2
pytest==8.3.2
python-dotenv==1.0.1
PyYAML==6.0.1
setuptools==70.3.0
setuptools==71.1.0
matplotlib==3.9.1
networkx==3.3
requests==2.32.3
Pillow==10.4.0
ansi2html==1.9.2
pycryptodome==3.20.0
cryptography==42.0.8
cryptography==43.0.0
psutil==6.0.0
rich==13.7.1
seaborn==0.13.2
Expand Down
51 changes: 29 additions & 22 deletions nebula/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def start_nodes_docker(self):
- /bin/bash
- -c
- |
ifconfig && echo '{} host.docker.internal' >> /etc/hosts && python3.11 /nebula/nebula/node.py {}
{} && ifconfig && echo '{} host.docker.internal' >> /etc/hosts && python3.11 /nebula/nebula/node.py {}
networks:
nebula-net-scenario:
ipv4_address: {}
Expand Down Expand Up @@ -588,7 +588,7 @@ def start_nodes_docker(self):
- /bin/bash
- -c
- |
ifconfig && echo '{} host.docker.internal' >> /etc/hosts && python3.11 /nebula/nebula/node.py {}
{} && ifconfig && echo '{} host.docker.internal' >> /etc/hosts && python3.11 /nebula/nebula/node.py {}
deploy:
resources:
reservations:
Expand Down Expand Up @@ -638,6 +638,7 @@ def start_nodes_docker(self):
services += participant_gpu_template.format(
idx,
self.root_path,
"sleep 10" if node["device_args"]["start"] else "sleep 0",
self.scenario.network_gateway,
path,
node["network_args"]["ip"],
Expand All @@ -648,6 +649,7 @@ def start_nodes_docker(self):
services += participant_template.format(
idx,
self.root_path,
"sleep 10" if node["device_args"]["start"] else "sleep 0",
self.scenario.network_gateway,
path,
node["network_args"]["ip"],
Expand All @@ -660,6 +662,19 @@ def start_nodes_docker(self):
# Write the Docker Compose file in config directory
with open(f"{self.config_dir}/docker-compose.yml", "w") as f:
f.write(docker_compose_file)

# Include additional config to the participants
for idx, node in enumerate(self.config.participants):
node["tracking_args"]["log_dir"] = "/nebula/app/logs"
node["tracking_args"]["config_dir"] = f"/nebula/app/config/{self.scenario_name}"
node["scenario_args"]["controller"] = self.controller
node["security_args"]["certfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_cert.pem"
node["security_args"]["keyfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_key.pem"
node["security_args"]["cafile"] = f"/nebula/app/certs/ca_cert.pem"

# Write the config file in config directory
with open(f"{self.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f:
json.dump(node, f, indent=4)

# Start the Docker Compose file, catch error if any
try:
Expand All @@ -678,34 +693,26 @@ def start_nodes_docker(self):
raise Exception("Docker Compose failed to start, please check if Docker Compose is installed (https://docs.docker.com/compose/install/) and Docker Engine is running.")

container_ids = None
logging.info("Waiting for nodes to start...")
# Loop until all containers are running (equivalent to the number of participants)
while container_ids is None or len(container_ids) != len(self.config.participants):
time.sleep(3)
try:
# Obtain docker ids
result = subprocess.run(["docker", "compose", "-f", f"{self.config_dir}/docker-compose.yml", "ps", "-q"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

# Obtain container IDs
try:
# Obtain docker ids
result = subprocess.run(["docker", "compose", "-f", f"{self.config_dir}/docker-compose.yml", "ps", "-q"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

if result.returncode != 0:
raise Exception(f"Error obtaining docker IDs: {result.stderr}")

container_ids = result.stdout.strip().split("\n")
if result.returncode != 0:
raise Exception(f"Error obtaining docker IDs: {result.stderr}")

except subprocess.CalledProcessError as e:
raise Exception("Docker Compose failed to start, please check if Docker Compose is installed " "(https://docs.docker.com/compose/install/) and Docker Engine is running.")
container_ids = result.stdout.strip().split("\n")

if not container_ids or len(container_ids) != len(self.config.participants):
raise Exception("The number of container IDs does not match the number of participants.")
except subprocess.CalledProcessError as e:
raise Exception("Docker Compose failed to start, please check if Docker Compose is installed " "(https://docs.docker.com/compose/install/) and Docker Engine is running.")

# Change log and config directory in dockers to /nebula/app, and change controller endpoint
for idx, node in enumerate(self.config.participants):
# Assign docker ID to node
node["device_args"]["docker_id"] = container_ids[idx]
# Print the configuration of the node
node["tracking_args"]["log_dir"] = "/nebula/app/logs"
node["tracking_args"]["config_dir"] = f"/nebula/app/config/{self.scenario_name}"
node["scenario_args"]["controller"] = self.controller
node["security_args"]["certfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_cert.pem"
node["security_args"]["keyfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_key.pem"
node["security_args"]["cafile"] = f"/nebula/app/certs/ca_cert.pem"

# Write the config file in config directory
with open(f"{self.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ matplotlib==3.9.1
networkx==3.3
psutil==6.0.0
pycryptodome==3.20.0
cryptography==42.0.8
cryptography==43.0.0
nvidia-ml-py==12.555.43

0 comments on commit 1b7b959

Please sign in to comment.