From ab29137a91de72b57d46b1d2d31e7957aadbf92c Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Thu, 21 Mar 2024 22:00:31 +0100 Subject: [PATCH] Fix: Changed threads implementation by asyncio implementation. --- src/aleph/vm/orchestrator/supervisor.py | 3 ++- src/aleph/vm/pool.py | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index f40031d48..20452df90 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -134,7 +134,8 @@ def run(): engine = setup_engine() asyncio.run(create_tables(engine)) - pool = VmPool() + loop = asyncio.new_event_loop() + pool = VmPool(loop) pool.setup() hostname = settings.DOMAIN_NAME diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index daee88424..02889f68e 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import threading from collections.abc import Iterable from datetime import datetime, timezone from typing import Optional @@ -44,12 +43,14 @@ class VmPool: network: Optional[Network] snapshot_manager: Optional[SnapshotManager] = None systemd_manager: SystemDManager - creation_lock: threading.Lock + creation_lock: asyncio.Lock - def __init__(self): + def __init__(self, loop: asyncio.AbstractEventLoop): self.counter = settings.START_ID_INDEX self.executions = {} - self.creation_lock = threading.Lock() + + asyncio.set_event_loop(loop) + self.creation_lock = asyncio.Lock() self.network = ( Network( @@ -89,8 +90,7 @@ async def create_a_vm( self, vm_hash: ItemHash, message: ExecutableContent, original: ExecutableContent, persistent: bool ) -> VmExecution: """Create a new Aleph Firecracker VM from an Aleph function message.""" - - with self.creation_lock: + async with self.creation_lock: # Check if an execution is already present for this VM, then return it. # Do not `await` in this section. current_execution = self.get_running_vm(vm_hash)