From a6e4bcf0527a998261698c555113a3f652d280d9 Mon Sep 17 00:00:00 2001 From: Sims <38142618+suchmememanyskill@users.noreply.github.com> Date: Mon, 2 Sep 2024 01:45:47 +0200 Subject: [PATCH] Fix updater taking a long time (#696) Replaces subprocess with asyncio.subprocess in some localplatformlinux functions and improves shutdown handling Co-authored-by: AAGaming --- .../localplatform/localplatformlinux.py | 42 ++++++++++++------- .../localplatform/localplatformwin.py | 2 +- backend/decky_loader/main.py | 7 ++-- .../decky_loader/plugin/sandboxed_plugin.py | 33 ++++++++++----- backend/decky_loader/updater.py | 7 +++- 5 files changed, 60 insertions(+), 31 deletions(-) diff --git a/backend/decky_loader/localplatform/localplatformlinux.py b/backend/decky_loader/localplatform/localplatformlinux.py index 1aeb3169..63a07292 100644 --- a/backend/decky_loader/localplatform/localplatformlinux.py +++ b/backend/decky_loader/localplatform/localplatformlinux.py @@ -1,11 +1,21 @@ from re import compile -from asyncio import Lock +from asyncio import Lock, create_subprocess_exec +from asyncio.subprocess import PIPE, DEVNULL, STDOUT, Process +from subprocess import call as call_sync import os, pwd, grp, sys, logging -from subprocess import call, run, DEVNULL, PIPE, STDOUT +from typing import IO, Any, Mapping from ..enums import UserType logger = logging.getLogger("localplatform") +# subprocess._ENV +ENV = Mapping[str, str] +ProcessIO = int | IO[Any] | None +async def run(args: list[str], stdin: ProcessIO = DEVNULL, stdout: ProcessIO = PIPE, stderr: ProcessIO = PIPE, env: ENV | None = None) -> tuple[Process, bytes | None, bytes | None]: + proc = await create_subprocess_exec(args[0], *(args[1:]), stdin=stdin, stdout=stdout, stderr=stderr, env=env) + proc_stdout, proc_stderr = await proc.communicate() + return (proc, proc_stdout, proc_stderr) + # Get the user id hosting the plugin loader def _get_user_id() -> int: return pwd.getpwnam(_get_user()).pw_uid @@ -54,7 +64,7 @@ def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = else: raise Exception("Unknown User Type") - result = call(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path]) + result = call_sync(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path]) return result == 0 def chmod(path : str, permissions : int, recursive : bool = True) -> bool: @@ -131,13 +141,17 @@ def setuid(user : UserType = UserType.HOST_USER): os.setuid(user_id) async def service_active(service_name : str) -> bool: - res = run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL) + res, _, _ = await run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL) return res.returncode == 0 -async def service_restart(service_name : str) -> bool: - call(["systemctl", "daemon-reload"]) +async def service_restart(service_name : str, block : bool = True) -> bool: + await run(["systemctl", "daemon-reload"]) cmd = ["systemctl", "restart", service_name] - res = run(cmd, stdout=PIPE, stderr=STDOUT) + + if not block: + cmd.append("--no-block") + + res, _, _ = await run(cmd, stdout=PIPE, stderr=STDOUT) return res.returncode == 0 async def service_stop(service_name : str) -> bool: @@ -146,7 +160,7 @@ async def service_stop(service_name : str) -> bool: return True cmd = ["systemctl", "stop", service_name] - res = run(cmd, stdout=PIPE, stderr=STDOUT) + res, _, _ = await run(cmd, stdout=PIPE, stderr=STDOUT) return res.returncode == 0 async def service_start(service_name : str) -> bool: @@ -155,13 +169,13 @@ async def service_start(service_name : str) -> bool: return True cmd = ["systemctl", "start", service_name] - res = run(cmd, stdout=PIPE, stderr=STDOUT) + res, _, _ = await run(cmd, stdout=PIPE, stderr=STDOUT) return res.returncode == 0 async def restart_webhelper() -> bool: logger.info("Restarting steamwebhelper") # TODO move to pkill - res = run(["killall", "-s", "SIGTERM", "steamwebhelper"], stdout=DEVNULL, stderr=DEVNULL) + res, _, _ = await run(["killall", "-s", "SIGTERM", "steamwebhelper"], stdout=DEVNULL, stderr=DEVNULL) return res.returncode == 0 def get_privileged_path() -> str: @@ -241,12 +255,12 @@ async def close_cef_socket(): logger.warning("Can't close CEF socket as Decky isn't running as root.") return # Look for anything listening TCP on port 8080 - lsof = run(["lsof", "-F", "-iTCP:8080", "-sTCP:LISTEN"], capture_output=True, text=True) - if lsof.returncode != 0 or len(lsof.stdout) < 1: + lsof, stdout, _ = await run(["lsof", "-F", "-iTCP:8080", "-sTCP:LISTEN"], stdout=PIPE) + if not stdout or lsof.returncode != 0 or len(stdout) < 1: logger.error(f"lsof call failed in close_cef_socket! return code: {str(lsof.returncode)}") return - lsof_data = cef_socket_lsof_regex.match(lsof.stdout) + lsof_data = cef_socket_lsof_regex.match(stdout.decode()) if not lsof_data: logger.error("lsof regex match failed in close_cef_socket!") @@ -258,7 +272,7 @@ async def close_cef_socket(): logger.info(f"Closing CEF socket with PID {pid} and FD {fd}") # Use gdb to inject a close() call for the socket fd into steamwebhelper - gdb_ret = run(["gdb", "--nx", "-p", pid, "--batch", "--eval-command", f"call (int)close({fd})"], env={"LD_LIBRARY_PATH": ""}) + gdb_ret, _, _ = await run(["gdb", "--nx", "-p", pid, "--batch", "--eval-command", f"call (int)close({fd})"], env={"LD_LIBRARY_PATH": ""}) if gdb_ret.returncode != 0: logger.error(f"Failed to close CEF socket with gdb! return code: {str(gdb_ret.returncode)}", exc_info=True) diff --git a/backend/decky_loader/localplatform/localplatformwin.py b/backend/decky_loader/localplatform/localplatformwin.py index 52ade07c..7e3bd31c 100644 --- a/backend/decky_loader/localplatform/localplatformwin.py +++ b/backend/decky_loader/localplatform/localplatformwin.py @@ -28,7 +28,7 @@ async def service_stop(service_name : str) -> bool: async def service_start(service_name : str) -> bool: return True # Stubbed -async def service_restart(service_name : str) -> bool: +async def service_restart(service_name : str, block : bool = True) -> bool: if service_name == "plugin_loader": sys.exit(42) diff --git a/backend/decky_loader/main.py b/backend/decky_loader/main.py index b86411e1..315b7d29 100644 --- a/backend/decky_loader/main.py +++ b/backend/decky_loader/main.py @@ -138,16 +138,17 @@ async def shutdown(self, _: Application): tasks = all_tasks() current = current_task() async def cancel_task(task: Task[Any]): - logger.debug(f"Cancelling task {task}") + name = task.get_coro().__qualname__ + logger.debug(f"Cancelling task {name}") try: task.cancel() try: await task except CancelledError: pass - logger.debug(f"Task {task} finished") + logger.debug(f"Task {name} finished") except: - logger.warning(f"Failed to cancel task {task}:\n" + format_exc()) + logger.warning(f"Failed to cancel task {name}:\n" + format_exc()) pass if current: tasks.remove(current) diff --git a/backend/decky_loader/plugin/sandboxed_plugin.py b/backend/decky_loader/plugin/sandboxed_plugin.py index 23575900..b730fa98 100644 --- a/backend/decky_loader/plugin/sandboxed_plugin.py +++ b/backend/decky_loader/plugin/sandboxed_plugin.py @@ -4,8 +4,9 @@ from json import dumps, loads from logging import getLogger from traceback import format_exc -from asyncio import (get_event_loop, new_event_loop, +from asyncio import (ensure_future, get_event_loop, new_event_loop, set_event_loop) +from signal import SIGINT, SIGTERM from setproctitle import setproctitle, setthreadtitle from .messages import SocketResponseDict, SocketMessageType @@ -38,6 +39,7 @@ def __init__(self, self.version = version self.author = author self.api_version = api_version + self.shutdown_running = False self.log = getLogger("sandboxed_plugin") @@ -48,7 +50,11 @@ def initialize(self, socket: LocalSocket): setproctitle(f"{self.name} ({self.file})") setthreadtitle(self.name) - set_event_loop(new_event_loop()) + loop = new_event_loop() + set_event_loop(loop) + # When running Decky manually in a terminal, ctrl-c will trigger this, so we have to handle it properly + loop.add_signal_handler(SIGINT, lambda: ensure_future(self.shutdown())) + loop.add_signal_handler(SIGTERM, lambda: ensure_future(self.shutdown())) if self.passive: return setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) @@ -155,22 +161,27 @@ async def _uninstall(self): self.log.error("Failed to uninstall " + self.name + "!\n" + format_exc()) pass - async def on_new_message(self, message : str) -> str|None: - data = loads(message) - - if "stop" in data: + async def shutdown(self, uninstall: bool = False): + if not self.shutdown_running: + self.shutdown_running = True self.log.info(f"Calling Loader unload function for {self.name}.") await self._unload() - if data.get('uninstall'): + if uninstall: self.log.info("Calling Loader uninstall function.") await self._uninstall() - self.log.debug("Stopping event loop") + self.log.debug("Stopping event loop") - loop = get_event_loop() - loop.call_soon_threadsafe(loop.stop) - sys.exit(0) + loop = get_event_loop() + loop.call_soon_threadsafe(loop.stop) + sys.exit(0) + + async def on_new_message(self, message : str) -> str|None: + data = loads(message) + + if "stop" in data: + await self.shutdown(data.get('uninstall')) d: SocketResponseDict = {"type": SocketMessageType.RESPONSE, "res": None, "success": True, "id": data["id"]} try: diff --git a/backend/decky_loader/updater.py b/backend/decky_loader/updater.py index 5cd25e72..75f9618b 100644 --- a/backend/decky_loader/updater.py +++ b/backend/decky_loader/updater.py @@ -24,6 +24,7 @@ class RemoteVerAsset(TypedDict): name: str + size: int browser_download_url: str class RemoteVer(TypedDict): tag_name: str @@ -198,11 +199,13 @@ async def do_update(self): version = self.remoteVer["tag_name"] download_url = None + size_in_bytes = None download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe" for x in self.remoteVer["assets"]: if x["name"] == download_filename: download_url = x["browser_download_url"] + size_in_bytes = x["size"] break if download_url == None: @@ -238,10 +241,10 @@ async def do_update(self): os.mkdir(path.join(getcwd(), ".systemd")) shutil.move(service_file_path, path.join(getcwd(), ".systemd")+"/plugin_loader.service") - await self.download_decky_binary(download_url, version) + await self.download_decky_binary(download_url, version, size_in_bytes=size_in_bytes) async def do_restart(self): - await service_restart("plugin_loader") + await service_restart("plugin_loader", block=False) async def do_shutdown(self): await service_stop("plugin_loader")