From a11420bcb2d5e0ebad793463d06a78137365dfec Mon Sep 17 00:00:00 2001 From: vringar Date: Thu, 12 Oct 2023 00:01:41 +0200 Subject: [PATCH] refactor(storage-watchdog): adjust storage watchdog implementation --- demo_watchdog.py | 11 +- openwpm/browser_manager.py | 33 ++-- openwpm/config.py | 70 ++++---- openwpm/deploy_browsers/deploy_firefox.py | 4 +- openwpm/task_manager.py | 33 ++-- openwpm/utilities/storage_watchdog.py | 200 ++++++++++------------ 6 files changed, 170 insertions(+), 181 deletions(-) diff --git a/demo_watchdog.py b/demo_watchdog.py index 1cfd3d9e5..2577695f7 100644 --- a/demo_watchdog.py +++ b/demo_watchdog.py @@ -19,7 +19,7 @@ print("Loading tranco top sites list...") t = tranco.Tranco(cache=True, cache_dir=".tranco") latest_list = t.list() - sites = ["http://" + x for x in latest_list.top(10)] + sites = ["http://" + x for x in latest_list.top(100)] else: sites = [ "http://www.example.com", @@ -28,7 +28,7 @@ "https://www.google.com", "https://www.minecraft.net", "https://www.nytimes.com", - "https://www.github.com" + "https://www.github.com", ] # Loads the default ManagerParams @@ -52,8 +52,9 @@ # a broken function # Record DNS resolution browser_param.dns_instrument = True - # Specify the location of temporary files. Ensure directory exists when specifying. + # Specify the location of temporary files. Ensure directory exists when specifying. # browser_param.tmp_profile_dir = "/" + browser_param.maximum_profile_size = 52428800 # Update TaskManager configuration (use this for crawl-wide settings) manager_params.data_directory = Path("./datadir/") @@ -64,7 +65,7 @@ # Please refer to docs/Configuration.md#platform-configuration-options for more information # manager_params.memory_watchdog = True # manager_params.process_watchdog = True -manager_params.storage_watchdog_enable = 52428800 + # Commands time out by default after 60 seconds with TaskManager( @@ -74,7 +75,7 @@ None, ) as manager: # Visits the sites - + for index, site in enumerate(sites): def callback(success: bool, val: str = site) -> None: diff --git a/openwpm/browser_manager.py b/openwpm/browser_manager.py index 29f282150..775352371 100644 --- a/openwpm/browser_manager.py +++ b/openwpm/browser_manager.py @@ -34,7 +34,7 @@ kill_process_and_children, parse_traceback_for_sentry, ) -from .utilities.storage_watchdog import StorageWatchdogThread +from .utilities.storage_watchdog import profile_size_exceeds_max_size pickling_support.install() @@ -43,7 +43,7 @@ class BrowserManagerHandle: - """The BrowserManagerHandle class is responsible for holding all of the + """The BrowserManagerHandle class is responsible for holding all the configuration and status information on BrowserManager process it corresponds to. It also includes a set of methods for managing the BrowserManager process and its child processes/threads. @@ -504,26 +504,13 @@ def execute_command_sequence( # Allow StorageWatchdog to utilize built-in browser reset functionality # which results in a graceful restart of the browser instance - if self.manager_params.storage_watchdog_enable: - - # storage_checker = threading.Thread(target=self.manager_params.storage_watchdog_obj.periodic_check, args=([self.current_profile_path, self])) - # storage_checker.daemon = True - # storage_checker.name = f"OpenWPM-storage-checker-{self.browser_id}" - storage_checker = StorageWatchdogThread(self.manager_params.storage_watchdog_obj, [ - self.current_profile_path, - self - ]) - storage_checker.daemon = True - storage_checker.name = "" - storage_checker.start() - storage_checker.join() - - # storage_checker.start() - # storage_checker.join() - - # reset = self.manager_params.storage_watchdog_obj.periodic_check(self.current_profile_path, self) - reset = storage_checker.ret_value + if self.browser_params.maximum_profile_size: + assert self.current_profile_path is not None + reset = profile_size_exceeds_max_size( + self.current_profile_path, + self.browser_params.maximum_profile_size, + ) if self.restart_required or reset: success = self.restart_browser_manager(clear_profile=reset) @@ -589,7 +576,9 @@ def kill_browser_manager(self): ) if self.display_port is not None: # xvfb display lock # lockfile = "/tmp/.X%s-lock" % self.display_port - lockfile = os.path.join(self.browser_params.tmp_profile_dir, f".X{self.display_port}-lock") + lockfile = os.path.join( + self.browser_params.tmp_profile_dir, f".X{self.display_port}-lock" + ) try: os.remove(lockfile) diff --git a/openwpm/config.py b/openwpm/config.py index a78bb2bd0..d9cf38f25 100644 --- a/openwpm/config.py +++ b/openwpm/config.py @@ -1,9 +1,8 @@ -import os +import tempfile from dataclasses import dataclass, field from json import JSONEncoder from pathlib import Path from typing import Any, Dict, List, Literal, Optional, Tuple, Union -import tempfile from dataclasses_json import DataClassJsonMixin from dataclasses_json import config as DCJConfig @@ -100,14 +99,46 @@ class BrowserParams(DataClassJsonMixin): profile_archive_dir: Optional[Path] = field( default=None, metadata=DCJConfig(encoder=path_to_str, decoder=str_to_path) ) - - tmp_profile_dir: str = tempfile.gettempdir() + + tmp_profile_dir: Path = field( + default=Path(tempfile.gettempdir()), + metadata=DCJConfig(encoder=path_to_str, decoder=str_to_path), + ) """ The tmp_profile_dir defaults to the OS's temporary file folder (typically /tmp) and is where the generated browser profiles and residual files are stored. """ - - + + maximum_profile_size: Optional[int] = None + """ + The total amount of on disk space a profile is allowed to consume in bytes. + If this option is not set, no checks will be performed + + Rationale + --------- + This option can serve as a happy medium between killing a browser after each + crawl and allowing the application to still perform quickly. + + Used as a way to save space + in a limited environment with minimal detriment to speed. + + If the maximum_profile_size is exceeded after a CommandSequence + is completed, the browser will be shut down and a new one will + be created. **Even with this setting you may temporarily have + more disk usage than the sum of all maximum_profile_sizes** + However, this will also ensure that a CommandSequence is + allowed to complete without undue interruptions. + + Sample values + ------------- + * 1073741824: 1GB + * 20971520: 20MB - for testing purposes + * 52428800: 50MB + * 73400320: 70MB + * 104857600: 100MB - IDEAL for 10+ browsers + + """ + recovery_tar: Optional[Path] = None donottrack: bool = False tracking_protection: bool = False @@ -142,30 +173,11 @@ class ManagerParams(DataClassJsonMixin): """A watchdog that tries to ensure that no Firefox instance takes up too much memory. It is mostly useful for long running cloud crawls""" process_watchdog: bool = False - - - storage_watchdog_enable: Optional[int] = None - """A watchdog that serves as a happy medium between killing a browser after each - crawl and allowing the application to still perform quickly. Used as a way to save space - in a limited environment with minimal detriment to speed. This Optional[int] should be the threshold - size of the folder in bytes. - ``` - # Sample values: - 1073741824: 1GB - 20971520: 20MB - for testing purposes - 52428800: 50MB - 73400320: 70MB - 104857600: 100MB - IDEAL for 10+ browsers - ``` + """It is used to create another thread that kills off `GeckoDriver` (or `Xvfb`) + instances that haven't been spawned by OpenWPM. (GeckoDriver is used by + Selenium to control Firefox and Xvfb a "virtual display" so we simulate having graphics when running on a server). """ - - storage_watchdog_obj = None # DO NOT EDIT THIS LINE - """Stores a handle to the actual watchdog object.""" - - """- It is used to create another thread that kills off `GeckoDriver` (or `Xvfb`) instances that haven't been spawned by OpenWPM. (GeckoDriver is used by -======= - """It is used to create another thread that kills off `GeckoDriver` (or `Xvfb`) instances that haven't been spawned by OpenWPM. (GeckoDriver is used by - Selenium to control Firefox and Xvfb a "virtual display" so we simulate having graphics when running on a server).""" + num_browsers: int = 1 _failure_limit: Optional[int] = None """The number of command failures the platform will tolerate before raising a diff --git a/openwpm/deploy_browsers/deploy_firefox.py b/openwpm/deploy_browsers/deploy_firefox.py index 46099f351..e52ec73c7 100755 --- a/openwpm/deploy_browsers/deploy_firefox.py +++ b/openwpm/deploy_browsers/deploy_firefox.py @@ -35,7 +35,9 @@ def deploy_firefox( root_dir = os.path.dirname(__file__) # directory of this file - browser_profile_path = Path(tempfile.mkdtemp(prefix="firefox_profile_", dir=browser_params.tmp_profile_dir)) + browser_profile_path = Path( + tempfile.mkdtemp(prefix="firefox_profile_", dir=browser_params.tmp_profile_dir) + ) status_queue.put(("STATUS", "Profile Created", browser_profile_path)) # Use Options instead of FirefoxProfile to set preferences since the diff --git a/openwpm/task_manager.py b/openwpm/task_manager.py index b5cd601fa..922187f95 100644 --- a/openwpm/task_manager.py +++ b/openwpm/task_manager.py @@ -3,6 +3,7 @@ import pickle import threading import time +from functools import reduce from types import TracebackType from typing import Any, Dict, List, Optional, Set, Type @@ -29,7 +30,7 @@ ) from .utilities.multiprocess_utils import kill_process_and_children from .utilities.platform_utils import get_configuration_string, get_version -from .utilities.storage_watchdog import StorageWatchdog +from .utilities.storage_watchdog import StorageLogger tblib.pickling_support.install() @@ -80,12 +81,10 @@ def __init__( manager_params.source_dump_path = manager_params.data_directory / "sources" - self.manager_params = manager_params - self.browser_params = browser_params + self.manager_params: ManagerParamsInternal = manager_params + self.browser_params: List[BrowserParamsInternal] = browser_params self._logger_kwargs = logger_kwargs - - # Create data directories if they do not exist if not os.path.exists(manager_params.screenshot_path): os.makedirs(manager_params.screenshot_path) @@ -131,16 +130,20 @@ def __init__( thread.name = "OpenWPM-watchdog" thread.start() - # Start the StorageWatchdog - if self.manager_params.storage_watchdog_enable: - - storage_watchdog = StorageWatchdog(self.browser_params[0].tmp_profile_dir ,self.manager_params.storage_watchdog_enable) - self.manager_params.storage_watchdog_obj = storage_watchdog - storage_watchdog_thread = threading.Thread(target=storage_watchdog.run, args=()) - storage_watchdog_thread.daemon = True - storage_watchdog_thread.name = "OpenWPM-storage-watchdog" - - storage_watchdog_thread.start() + # Start the StorageLogger if a maximum storage value has been specified for any browser + if reduce( + lambda x, y: x or y, + map(lambda p: p.maximum_profile_size is not None, self.browser_params), + False, + ): + storage_logger = StorageLogger( + self.browser_params[0].tmp_profile_dir, + ) + + storage_logger.daemon = True + storage_logger.name = "OpenWPM-storage-logger" + + storage_logger.start() # Save crawl config information to database openwpm_v, browser_v = get_version() self.storage_controller_handle.save_configuration( diff --git a/openwpm/utilities/storage_watchdog.py b/openwpm/utilities/storage_watchdog.py index cfaa0f1d0..d6ae29e38 100644 --- a/openwpm/utilities/storage_watchdog.py +++ b/openwpm/utilities/storage_watchdog.py @@ -1,141 +1,123 @@ +import logging +import math +import os +import subprocess import time +from pathlib import Path from threading import Thread -import logging, math, time -import subprocess, os -from watchdog.observers import Observer - +from typing import Optional # Nifty little function to prettyfi the output. Takes in a number of bytes and spits out the # corresponding size in the largest unit it is able to convert to. -def convert_size(size_bytes): - if size_bytes == 0: - return "0B" - size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") - i: int = int(math.floor(math.log(size_bytes, 1024))) - p: float = math.pow(1024, i) - s: float = round(size_bytes / p, 2) - return "%s %s" % (s, size_name[i]) -def total_folder_size(startup=False, debug=False, root_dir="/tmp"): - """_summary_ +def convert_size(size_bytes: int) -> str: + if size_bytes == 0: + return "0B" + size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + i: int = int(math.floor(math.log(size_bytes, 1024))) + p: float = math.pow(1024, i) + s: float = round(size_bytes / p, 2) + return "%s %s" % (s, size_name[i]) - Args: - startup (bool, optional): Runs the function on the total supplied folder. Defaults to False. - debug (bool, optional): Useful for debugging functionality locally. Defaults to False. - root_dir (str, optional): The root directory to check. Defaults to "/tmp". - Returns: - _type_: _description_ +def total_folder_size(startup: bool = False, root_dir: str = "/tmp") -> str: + """Generates a human-readable message about the current size of the directory + + Args: + startup (bool, optional): Runs the function on the total supplied folder. + root_dir (str, optional): The root directory that will be recursively checked. """ - - + running_total: int = 0 if not startup: for file in os.listdir(root_dir): if "firefox" in file or ".xpi" in file or "owpm" in file or "Temp" in file: - path = os.path.join(root_dir,file) + path = os.path.join(root_dir, file) try: - running_total += int(subprocess.check_output(['du','-s', '-b', path]).split()[0].decode('utf-8')) + running_total += int( + subprocess.check_output(["du", "-s", "-b", path]) + .split()[0] + .decode("utf-8") + ) except: pass - if debug: - print(f"Currently using: {convert_size(running_total)} of storage on disk...") - return - return (f"Currently using: {convert_size(running_total)} of storage on disk...") - else: - for file in os.listdir(root_dir): - path: str = os.path.join(root_dir,file) - try: - running_total += int(subprocess.check_output(['du','-s', '-b', path], stderr=subprocess.DEVNULL).split()[0].decode('utf-8')) - except: - pass - if debug: - print(f"Readable files in {root_dir} folder take up {convert_size(running_total)} of storage on disk at start time...") - return - return (f"Readable files in {root_dir} folder take up {convert_size(running_total)} of storage on disk at start time...") + return f"Currently using: {convert_size(running_total)} of storage on disk..." + + for file in os.listdir(root_dir): + path = os.path.join(root_dir, file) + try: + running_total += int( + subprocess.check_output( + ["du", "-s", "-b", path], stderr=subprocess.DEVNULL + ) + .split()[0] + .decode("utf-8") + ) + except: + pass -class StorageWatchdog(): - # DIRECTORY_TO_WATCH = "/mnt/04dc803b-5e97-4b16-bdaf-80845c61942d" + return f"Readable files in {root_dir} folder take up {convert_size(running_total)} of storage on disk at start time..." - def __init__(self, supplied_dir=None, dirsize=0): - self.MAX_DIRSIZE = dirsize - self.observer = Observer() + +class StorageLogger(Thread): + """Logs the total amount of storage used in the supplied_dir""" + + def __init__(self, supplied_dir: Optional[Path] = None) -> None: + super().__init__() self.dir_to_watch = supplied_dir def run(self) -> None: - # Checks if the default dirsize and directory to watch were configured. If they are still the default, it exits because + logger = logging.getLogger("openwpm") + # Checks if the default dirsize and directory to watch were configured. + # If they are still the default, it exits because # it would essentially work identically to setting the "reset" flag in the command sequence - if self.MAX_DIRSIZE == 0 or self.dir_to_watch is None: + if self.dir_to_watch is None: + logger.info("No dir_to_watch specified. StorageLogger shutting down") return - - logger = logging.getLogger("openwpm") - logger.info("Starting the StorageWatchdog...") + + logger.info("Starting the StorageLogger...") logger.info(total_folder_size(startup=True)) try: while True: - time.sleep(300) # Give storage updates every 5 minutes + time.sleep(300) # Give storage updates every 5 minutes logger.info(total_folder_size()) - except: - self.observer.stop() print("Error") - self.observer.join() - - def periodic_check(self, profile_path, obj): - logger = logging.getLogger("openwpm") - # 1073741824: # 1GB - # 20971520: # 20MB - for testing purposes - # 52428800: # 50MB - # 73400320: # 70MB - # 104857600: 100MB - IDEAL for 10+ browsers - - # Max Size before a restart expressed in bytes - if self.MAX_DIRSIZE == 0: - pass - - READABLE_MAX_DIRSIZE = convert_size(self.MAX_DIRSIZE) - - dirsize = int(subprocess.check_output(['du','-s', '-b', profile_path]).split()[0].decode('utf-8')) - readable_dirsize = convert_size(dirsize) - - if dirsize < self.MAX_DIRSIZE: - - logger.info(f"Current browser profile directory {profile_path} size is less than {READABLE_MAX_DIRSIZE}: {readable_dirsize}") - return False - else: - obj.restart_required = True - logger.info(f"{profile_path}: Folder scheduled to be deleted and recover {readable_dirsize} of storage.") - return True - - -class StorageWatchdogThread(Thread): - """ - This is a custom implementation of the Thread subclass from the threading module - that allows for collection of the return value. This was necessary to prevent the main - StorageWatchdog thread from being hemmed up running each browser profile check - in its main thread and instead, spawning separate instances and blocking each browser thread until - the check is complete, ensuring asynchio doesnt get upset. - """ - - def __init__(self, watchdog: StorageWatchdog, argList: list[str]): - """_summary_ - - Args: - watchdog (StorageWatchdog): The main StorageWatchdog Object, running the main thread - argList (list[str]): - argList[0]: The profile_dir - argList[1]: The BrowserManager instance - """ - Thread.__init__(self) - self.ret_value = None - self.watchdog = watchdog - self.argList = argList - - def run(self): - self.ret_value = self.watchdog.periodic_check(self.argList[0], self.argList[1]) - -if __name__ == '__main__': + +def profile_size_exceeds_max_size( + profile_path: Path, + max_dir_size: int, +) -> bool: + logger = logging.getLogger("openwpm") + # 1073741824: # 1GB + # 20971520: # 20MB - for testing purposes + # 52428800: # 50MB + # 73400320: # 70MB + # 104857600: 100MB - IDEAL for 10+ browsers + + readable_max_dir_size = convert_size(max_dir_size) + + dir_size = int( + subprocess.check_output(["du", "-s", "-b", profile_path]) + .split()[0] + .decode("utf-8") + ) + readable_dir_size = convert_size(dir_size) + + if dir_size < max_dir_size: + logger.info( + f"Current browser profile directory {profile_path} size is less than {readable_max_dir_size}: {readable_dir_size}" + ) + return False + else: + logger.info( + f"{profile_path}: Folder scheduled to be deleted and recover {readable_dir_size} of storage." + ) + return True + + +if __name__ == "__main__": print("---Testing the StorageWatchdog folder size function---") - total_folder_size(startup=True, debug=True) + print(total_folder_size(startup=True))