Skip to content

Commit

Permalink
Fix: VM were not properly shutdowm, persistent volumes could be corru…
Browse files Browse the repository at this point in the history
…pted
  • Loading branch information
hoh committed Jan 26, 2022
1 parent 9f48eb3 commit 6cfbd53
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 12 deletions.
31 changes: 30 additions & 1 deletion firecracker/microvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,46 @@ async def unix_client_connected(*_):
logger.warning("Never received signal from init")
raise MicroVMFailedInit()

async def shutdown(self):
logger.debug(f"Shutown vm={self.vm_id}")
reader, writer = await asyncio.open_unix_connection(path=self.vsock_path)
payload = b"halt"
writer.write(b"CONNECT 52\n" + payload)

await writer.drain()

ack: bytes = await reader.readline()
logger.debug(f"ack={ack.decode()}")

msg: bytes = await reader.readline()
logger.debug(f"msg={msg}")

msg2: bytes = await reader.readline()
logger.debug(f"msg2={msg2}")

if msg2 != b"STOPZ\n":
logger.error("Unexpected response from VM")

async def stop(self):
if self.proc:
logger.debug("Stopping firecracker process")
try:
self.proc.terminate()
self.proc.kill()
except ProcessLookupError:
pass
logger.debug(f"Firecracker process pid={self.proc.pid} not found")
self.proc = None
else:
logger.debug("No firecracker process to stop")

async def teardown(self):
"""Stop the VM, cleanup network interface and remove data directory."""
try:
await asyncio.wait_for(self.shutdown(), timeout=5)
except asyncio.TimeoutError:
logger.exception(f"Timeout during VM shutdown vm={self.vm_id}")
logger.debug("Waiting for one second for the process to shudown")
await asyncio.sleep(1)
await self.stop()

if self.stdout_task:
Expand Down
84 changes: 73 additions & 11 deletions runtimes/aleph-alpine-3.13-python/init1.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

logger.debug("Imports starting")

import ctypes
import asyncio
import os
import socket
Expand Down Expand Up @@ -42,6 +43,10 @@ class Interface(str, Enum):
executable = "executable"


class ShutdownException(Exception):
pass


@dataclass
class Volume:
mount: str
Expand Down Expand Up @@ -329,13 +334,26 @@ async def run_executable_http(scope: dict) -> Tuple[Dict, Dict, str, Optional[by


async def process_instruction(
instruction: bytes, interface: Interface, application
instruction: bytes, interface: Interface, application: Union[ASGIApplication, subprocess.Popen]
) -> AsyncIterable[bytes]:

if instruction == b"halt":
logger.info("Received halt command")
system("sync")
logger.debug("Filesystems synced")
if isinstance(application, subprocess.Popen):
application.terminate()
logger.debug("Application terminated")
# application.communicate()
else:
# Close the cached session in aleph_client:
from aleph_client.asynchronous import get_fallback_session
session: aiohttp.ClientSession = get_fallback_session()
await session.close()
logger.debug("Aiohttp cached session closed")
yield b"STOP\n"
sys.exit()
logger.debug("Supervisor informed of halt")
raise ShutdownException
elif instruction.startswith(b"!"):
# Execute shell commands in the form `!ls /`
msg = instruction[1:].decode()
Expand Down Expand Up @@ -417,6 +435,14 @@ def setup_system(config: ConfigurationPayload):
logger.debug("Setup finished")


def umount_volumes(volumes: List[Volume]):
"Umount user related filesystems"
system("sync")
for volume in volumes:
logger.debug(f"Umounting /dev/{volume.device} on {volume.mount}")
system(f"umount {volume.mount}")


async def main():
client, addr = s.accept()

Expand All @@ -437,6 +463,11 @@ async def main():
logger.exception("Program could not be started")
raise

class ServerReference:
"Reference used to close the server from within `handle_instruction"
server: asyncio.AbstractServer
server_reference = ServerReference()

async def handle_instruction(reader, writer):
data = await reader.read(1000_1000) # Max 1 Mo

Expand All @@ -445,23 +476,54 @@ async def handle_instruction(reader, writer):
data_to_print = f"{data[:500]}..." if len(data) > 500 else data
logger.debug(f"<<<\n\n{data_to_print}\n\n>>>")

async for result in process_instruction(instruction=data, interface=config.interface,
application=app):
writer.write(result)
try:
async for result in process_instruction(instruction=data, interface=config.interface,
application=app):
writer.write(result)
await writer.drain()

logger.debug("Instruction processed")
except ShutdownException:
logger.info("Initiating shutdown")
writer.write(b"STOPZ\n")
await writer.drain()

logger.debug("...DONE")
writer.close()
logger.debug("Shutdown confirmed to supervisor")
server_reference.server.close()
logger.debug("Supervisor socket server closed")
finally:
writer.close()

server = await asyncio.start_server(handle_instruction, sock=s)
server_reference.server = server

addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')

async with server:
await server.serve_forever()

try:
async with server:
await server.serve_forever()
except asyncio.CancelledError:
logger.debug("Server was properly cancelled")
finally:
logger.warning("System shutdown")
server.close()
logger.debug("Server closed")
umount_volumes(config.volumes)
logger.debug("User volumes unmounted")

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())

logger.info("Unmounting system filesystems")
system("umount /dev/shm")
system("umount /dev/pts")
system("umount -a")

logger.info("Sending reboot syscall")
# Send reboot syscall, see man page
# https://man7.org/linux/man-pages/man2/reboot.2.html
libc = ctypes.CDLL(None)
libc.syscall(169, 0xfee1dead, 672274793, 0x4321fedc, None)
# The exit should not happen due to system halt.
sys.exit(0)

0 comments on commit 6cfbd53

Please sign in to comment.