From e8c1476e59407c3cfa62e6af0b7e58e25f0147dd Mon Sep 17 00:00:00 2001 From: AT0myks Date: Thu, 14 Dec 2023 14:44:20 +0100 Subject: [PATCH] Rename a few functions --- README.md | 2 +- reolinkfw/__init__.py | 40 ++++++++++++++++++++-------------------- reolinkfw/__main__.py | 14 +++++++------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 9be4912..a736a17 100644 --- a/README.md +++ b/README.md @@ -161,7 +161,7 @@ print(get_info(url)) file = "/home/ben/RLC-410-5MP_20_20052300.zip" print(get_info(file)) with ReolinkFirmware.from_file(file) as fw: - fw.extract_pak() + fw.extract() ``` In most cases where a URL is used, it will be a direct link to the file diff --git a/reolinkfw/__init__.py b/reolinkfw/__init__.py index b47834a..ecf0d43 100644 --- a/reolinkfw/__init__.py +++ b/reolinkfw/__init__.py @@ -151,7 +151,7 @@ def get_fs_info(self) -> list[dict[str, str]]: }) return result - async def get_info_from_pak(self) -> dict[str, Any]: + async def get_info(self) -> dict[str, Any]: ha = await asyncio.to_thread(self.sha256) app = self._fs_sections[-1] with self.open(app) as f: @@ -204,7 +204,7 @@ def extract_file_system(self, section: Section, dest: Optional[Path] = None) -> else: raise Exception("Unknown file system") - def extract_pak(self, dest: Optional[Path] = None, force: bool = False) -> None: + def extract(self, dest: Optional[Path] = None, force: bool = False) -> None: dest = (Path.cwd() / "reolink_firmware") if dest is None else dest dest.mkdir(parents=True, exist_ok=force) rootfsdir = [s.name for s in self if s.name in ROOTFS_SECTIONS][0] @@ -227,20 +227,20 @@ async def download(url: StrOrURL) -> Union[bytes, int]: return await resp.read() if resp.status == 200 else resp.status -def extract_paks(zip: Union[StrPath, IO[bytes]]) -> list[tuple[str, ReolinkFirmware]]: - """Return a list of tuples, one for each PAK file found in the ZIP. +def firmwares_from_zip(zip: Union[StrPath, IO[bytes]]) -> list[tuple[str, ReolinkFirmware]]: + """Return a list of tuples, one for each firmware found in the ZIP. - It is the caller's responsibility to close the PAK files. + It is the caller's responsibility to close the firmware files. """ - paks = [] + fws = [] with ZipFile(zip) as myzip: for name in myzip.namelist(): file = myzip.open(name) if is_pak_file(file): - paks.append((file.name, ReolinkFirmware.from_fd(file))) + fws.append((file.name, ReolinkFirmware.from_fd(file))) else: file.close() - return paks + return fws def get_info_from_files(files: Mapping[Files, Optional[bytes]]) -> dict[str, Optional[str]]: @@ -323,19 +323,19 @@ async def direct_download_url(url: str) -> str: return url -async def get_paks(file_or_url: StrPathURL, use_cache: bool = True) -> list[tuple[Optional[str], ReolinkFirmware]]: - """Return PAK files read from an on-disk file or a URL. +async def firmwares_from_file(file_or_url: StrPathURL, use_cache: bool = True) -> list[tuple[Optional[str], ReolinkFirmware]]: + """Return firmwares read from an on-disk file or a URL. The file or resource may be a ZIP or a PAK. On success return a list of 2-tuples where each tuple is of the form - `(pak_name, pak_file)`. When the argument is a URL, `pak_name` may + `(filename, firmware)`. When the argument is a URL, `filename` may be None. If the file is a ZIP the list might be empty. - It is the caller's responsibility to close the PAK files. + It is the caller's responsibility to close the firmware files. """ file_or_url = str(file_or_url) if is_url(file_or_url): if use_cache and has_cache(file_or_url): - return await get_paks(get_cache_file(file_or_url)) + return await firmwares_from_file(get_cache_file(file_or_url)) file_or_url = await direct_download_url(file_or_url) zip_or_pak_bytes = await download(file_or_url) if isinstance(zip_or_pak_bytes, int): @@ -348,13 +348,13 @@ async def get_paks(file_or_url: StrPathURL, use_cache: bool = True) -> list[tupl else: zipfile = io.BytesIO(zip_or_pak_bytes) if is_zipfile(zipfile): - return await asyncio.to_thread(extract_paks, zipfile) + return await asyncio.to_thread(firmwares_from_zip, zipfile) zipfile.close() raise Exception("Not a ZIP or a PAK file") elif is_local_file(file_or_url): file_or_url = Path(file_or_url) if is_zipfile(file_or_url): - return await asyncio.to_thread(extract_paks, file_or_url) + return await asyncio.to_thread(firmwares_from_zip, file_or_url) elif is_pak_file(file_or_url): return [(file_or_url.name, ReolinkFirmware.from_file(file_or_url))] raise Exception("Not a ZIP or a PAK file") @@ -367,12 +367,12 @@ async def get_info(file_or_url: StrPathURL, use_cache: bool = True) -> list[dict The file or resource may be a ZIP or a PAK. """ try: - paks = await get_paks(file_or_url, use_cache) + fws = await firmwares_from_file(file_or_url, use_cache) except Exception as e: return [{"file": file_or_url, "error": str(e)}] - if not paks: + if not fws: return [{"file": file_or_url, "error": "No PAKs found in ZIP file"}] - info = [{**await pakfile.get_info_from_pak(), "file": file_or_url, "pak": pakname} for pakname, pakfile in paks] - for _, pakfile in paks: - pakfile.close() + info = [{**await fw.get_info(), "file": file_or_url, "pak": pakname} for pakname, fw in fws] + for _, fw in fws: + fw.close() return info diff --git a/reolinkfw/__main__.py b/reolinkfw/__main__.py index 68d0f11..1ec668d 100644 --- a/reolinkfw/__main__.py +++ b/reolinkfw/__main__.py @@ -7,7 +7,7 @@ from datetime import datetime from pathlib import Path, PurePath -from reolinkfw import __version__, get_info, get_paks +from reolinkfw import __version__, firmwares_from_file, get_info HW_FIELDS = ("board_type", "detail_machine_type", "board_name") @@ -43,14 +43,14 @@ async def info(args: Namespace) -> None: async def extract(args: Namespace) -> None: - paks = await get_paks(args.file_or_url, not args.no_cache) - if not paks: + fws = await firmwares_from_file(args.file_or_url, not args.no_cache) + if not fws: raise Exception("No PAKs found in ZIP file") dest = Path.cwd() if args.dest is None else args.dest - for pakname, pakfile in paks: - name = pakfile.sha256() if pakname is None else PurePath(pakname).stem - await asyncio.to_thread(pakfile.extract_pak, dest / name, args.force) - pakfile.close() + for pakname, fw in fws: + name = fw.sha256() if pakname is None else PurePath(pakname).stem + await asyncio.to_thread(fw.extract, dest / name, args.force) + fw.close() def main() -> None: