From cf8b6126b7eb1468e7293e1ce8cb61e2cfdaa266 Mon Sep 17 00:00:00 2001 From: ACA Date: Sun, 3 Mar 2024 14:05:34 +0100 Subject: [PATCH] core: app, downloader, novel_info, sources: replace os with pathlib where possible --- lncrawl/core/app.py | 28 ++++++++++++---------------- lncrawl/core/downloader.py | 34 +++++++++++++++++----------------- lncrawl/core/novel_info.py | 3 +-- lncrawl/core/sources.py | 8 ++++---- 4 files changed, 34 insertions(+), 39 deletions(-) diff --git a/lncrawl/core/app.py b/lncrawl/core/app.py index 809004725..46a1744a8 100644 --- a/lncrawl/core/app.py +++ b/lncrawl/core/app.py @@ -1,6 +1,5 @@ import atexit import logging -import os import shutil from pathlib import Path from threading import Thread @@ -149,15 +148,13 @@ def get_novel_info(self): ) source_name = slugify(urlparse(self.crawler.home_url).netloc) - self.output_path = os.path.join( - C.DEFAULT_OUTPUT_PATH, source_name, self.good_file_name - ) + self.output_path = Path(C.DEFAULT_OUTPUT_PATH) / source_name / self.good_file_name # ----------------------------------------------------------------------- # def start_download(self): """Requires: crawler, chapters, output_path""" - if not self.output_path or not os.path.isdir(self.output_path): + if not self.output_path or not Path(self.output_path).is_dir(): raise LNException("Output path is not defined") assert self.crawler @@ -169,7 +166,7 @@ def start_download(self): save_metadata(self, True) if not self.output_formats.get(OutputFormat.json.value, False): - shutil.rmtree(os.path.join(self.output_path, "json"), ignore_errors=True) + shutil.rmtree(Path(self.output_path) / "json", ignore_errors=True) if self.can_do("logout"): self.crawler.logout() @@ -209,39 +206,38 @@ def compress_books(self, archive_singles=False): logger.info("Compressing output...") # Get which paths to be archived with their base names - path_to_process = [] + path_to_process: list[tuple[Path, str]] = [] for fmt in available_formats: - root_dir = os.path.join(self.output_path, fmt) - if os.path.isdir(root_dir): + root_dir: Path = Path(self.output_path) / fmt + if root_dir.is_dir(): path_to_process.append( - [root_dir, self.good_file_name + " (" + fmt + ")"] + (root_dir, self.good_file_name + " (" + fmt + ")") ) # Archive files self.archived_outputs = [] for root_dir, output_name in path_to_process: - file_list = os.listdir(root_dir) + file_list = list(root_dir.glob("*")) if len(file_list) == 0: logger.info("It has no files: %s", root_dir) continue - archived_file = None if ( len(file_list) == 1 and not archive_singles - and not os.path.isdir(os.path.join(root_dir, file_list[0])) + and not (root_dir / file_list[0]).is_dir() ): logger.info("Not archiving single file inside %s" % root_dir) - archived_file = os.path.join(root_dir, file_list[0]) + archived_file = (root_dir / file_list[0]).as_posix() else: base_path = Path(self.output_path) / output_name logger.info("Compressing %s to %s" % (root_dir, base_path)) archived_file = shutil.make_archive( - base_path, + base_path.as_posix(), format="zip", root_dir=root_dir, ) - logger.info("Compressed: %s", os.path.basename(archived_file)) + logger.info("Compressed: %s", Path(archived_file).name) if archived_file: self.archived_outputs.append(archived_file) diff --git a/lncrawl/core/downloader.py b/lncrawl/core/downloader.py index 39bcf18a6..09bea39db 100644 --- a/lncrawl/core/downloader.py +++ b/lncrawl/core/downloader.py @@ -3,7 +3,7 @@ """ import json import logging -import os +from pathlib import Path from ..models.chapter import Chapter from ..utils.imgen import generate_cover_image @@ -17,13 +17,13 @@ def _chapter_file( output_path: str, pack_by_volume: bool, ): - dir_name = os.path.join(output_path, "json") + dir_name = Path(output_path) / "json" if pack_by_volume: vol_name = "Volume " + str(chapter.volume).rjust(2, "0") - dir_name = os.path.join(dir_name, vol_name) + dir_name = dir_name / vol_name chapter_name = str(chapter.id).rjust(5, "0") - json_file = os.path.join(dir_name, chapter_name + ".json") + json_file = dir_name / (chapter_name + ".json") return json_file @@ -54,8 +54,8 @@ def _save_chapter(app, chapter: Chapter): output_path=app.output_path, pack_by_volume=app.pack_by_volume, ) - os.makedirs(os.path.dirname(file_name), exist_ok=True) - with open(file_name, "w", encoding="utf-8") as fp: + file_name.parent.mkdir(parents=True, exist_ok=True) + with file_name.open("w", encoding="utf-8") as fp: json.dump(chapter, fp, ensure_ascii=False) @@ -100,22 +100,22 @@ def fetch_chapter_body(app): logger.info(f"Processed {len(app.chapters)} chapters [{app.progress} fetched]") -def _fetch_content_image(app, url, image_file): +def _fetch_content_image(app, url, image_file: Path): from .app import App assert isinstance(app, App) - if url and not os.path.isfile(image_file): + if url and not (image_file.exists() and image_file.is_file()): try: img = app.crawler.download_image(url) - os.makedirs(os.path.dirname(image_file), exist_ok=True) + image_file.parent.mkdir(parents=True, exist_ok=True) if img.mode not in ("L", "RGB", "YCbCr", "RGBX"): if img.mode == "RGBa": #RGBa -> RGB isn't supported so we go through RGBA first img.convert("RGBA").convert("RGB") else: img = img.convert("RGB") - img.save(image_file, "JPEG", optimized=True) + img.save(image_file.as_posix(), "JPEG", optimized=True) img.close() logger.debug("Saved image: %s", image_file) finally: @@ -129,7 +129,7 @@ def _fetch_cover_image(app): assert app.crawler is not None filename = "cover.jpg" - cover_file = os.path.join(app.output_path, filename) + cover_file = Path(app.output_path) / filename if app.crawler.novel_cover: try: _fetch_content_image( @@ -141,12 +141,12 @@ def _fetch_cover_image(app): if logger.isEnabledFor(logging.DEBUG): logger.exception("Failed to download cover", e) - if not os.path.isfile(cover_file): - generate_cover_image(cover_file) + if not cover_file.exists() and cover_file.is_file(): + generate_cover_image(cover_file.as_posix()) app.progress += 1 app.book_cover = cover_file - assert os.path.isfile(app.book_cover), "Failed to download or generate cover image" + assert Path(app.book_cover).is_file(), "Failed to download or generate cover image" def _discard_failed_images(app, chapter, failed): @@ -191,7 +191,7 @@ def fetch_chapter_images(app): ] # download content images - image_folder = os.path.join(app.output_path, "images") + image_folder = Path(app.output_path) / "images" images_to_download = set( [ (filename, url) @@ -204,7 +204,7 @@ def fetch_chapter_images(app): _fetch_content_image, app, url, - os.path.join(image_folder, filename), + image_folder / filename, ) for filename, url in images_to_download ] @@ -215,7 +215,7 @@ def fetch_chapter_images(app): failed = [ filename for filename, url in images_to_download - if not os.path.isfile(os.path.join(image_folder, filename)) + if not (image_folder / filename).is_file() ] finally: logger.info("Processed %d images [%d failed]" % (app.progress, len(failed))) diff --git a/lncrawl/core/novel_info.py b/lncrawl/core/novel_info.py index ef2c542cb..a993ed2d3 100644 --- a/lncrawl/core/novel_info.py +++ b/lncrawl/core/novel_info.py @@ -1,5 +1,4 @@ import math -import os import re from pathlib import Path from typing import Dict @@ -109,6 +108,6 @@ def save_metadata(app, completed=False): ), ) - os.makedirs(app.output_path, exist_ok=True) + Path(app.output_path).mkdir(parents=True, exist_ok=True) file_name = Path(app.output_path) / C.META_FILE_NAME novel.to_json(file_name, encoding="utf-8", indent=2) diff --git a/lncrawl/core/sources.py b/lncrawl/core/sources.py index 79c88d8c3..9c4ea6c9e 100644 --- a/lncrawl/core/sources.py +++ b/lncrawl/core/sources.py @@ -83,7 +83,7 @@ def __download_data(url: str): __index_fetch_internval_in_seconds = 30 * 60 __master_index_file_url = "https://raw.githubusercontent.com/dipu-bd/lightnovel-crawler/master/sources/_index.json" -__user_data_path = Path(os.path.expanduser("~")) / ".lncrawl" +__user_data_path = Path("~").expanduser() / ".lncrawl" __local_data_path = Path(__file__).parent.parent.absolute() if not (__local_data_path / "sources").is_dir(): __local_data_path = __local_data_path.parent @@ -110,7 +110,7 @@ def __load_current_index(): def __save_current_index(): index_file = __user_data_path / "sources" / "_index.json" - os.makedirs(index_file.parent, exist_ok=True) + index_file.parent.mkdir(parents=True, exist_ok=True) logger.debug("Saving current index data to %s", index_file) with open(index_file, "w", encoding="utf8") as fp: @@ -170,12 +170,12 @@ def __save_source_data(source_id, data): dst_dir = dst_file.parent temp_file = dst_dir / ("." + dst_file.name) - os.makedirs(dst_dir, exist_ok=True) + dst_dir.mkdir(parents=True, exist_ok=True) with open(temp_file, "wb") as fp: fp.write(data) if dst_file.exists(): - os.remove(dst_file) + dst_file.unlink() temp_file.rename(dst_file) global __current_index