From eea949cc3b2a4948c79444436ee1acf6d32a7909 Mon Sep 17 00:00:00 2001 From: sb0y Date: Tue, 15 Oct 2024 20:30:08 +0000 Subject: [PATCH] added scheduler saving state, added captions for cached videos --- warp_beacon/scheduler/scheduler.py | 35 ++++++++++++++++++++++++++++-- warp_beacon/telegram/bot.py | 7 +++++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/warp_beacon/scheduler/scheduler.py b/warp_beacon/scheduler/scheduler.py index d4e6fcd..48844e4 100644 --- a/warp_beacon/scheduler/scheduler.py +++ b/warp_beacon/scheduler/scheduler.py @@ -1,4 +1,7 @@ +import os +import time import threading +import json from warp_beacon.jobs import Origin import warp_beacon @@ -6,10 +9,12 @@ import logging class IGScheduler(object): + state_file = "/var/warp_beacon/scheduler_state.json" downloader = None running = True thread = None event = None + state = {"remaining": 3600} def __init__(self, downloader: warp_beacon.scraper.AsyncDownloader) -> None: self.downloader = downloader @@ -18,6 +23,23 @@ def __init__(self, downloader: warp_beacon.scraper.AsyncDownloader) -> None: def __del__(self) -> None: self.stop() + def save_state(self) -> None: + try: + with open(self.state_file, 'w+', encoding="utf-8") as f: + f.write(json.dumps(self.state)) + except Exception as e: + logging.error("Failed to save Scheduler state!") + logging.exception(e) + + def load_state(self) -> None: + try: + if os.path.exists(self.state_file): + with open(self.state_file, 'r', encoding="utf-8") as f: + self.state = json.loads(f.read()) + except Exception as e: + logging.error("Failed to load Scheduler state!") + logging.exception(e) + def start(self) -> None: self.thread = threading.Thread(target=self.do_work) self.thread.start() @@ -55,11 +77,20 @@ def validate_ig_session(self) -> bool: def do_work(self) -> None: logging.info("Scheduler thread started ...") + self.load_state() + timeout = self.state["remaining"] while self.running: try: logging.info("Scheduler waking up") + start_time = time.time() self.validate_ig_session() - self.event.wait(timeout=3600) + self.event.wait(timeout=timeout) + elapsed = time.time() - start_time + self.state["remaining"] = timeout - elapsed + + if self.state["remaining"] <= 0: + self.state["remaining"] = 3600 except Exception as e: logging.error("An error occurred in scheduler thread!") - logging.exception(e) \ No newline at end of file + logging.exception(e) + self.save_state() \ No newline at end of file diff --git a/warp_beacon/telegram/bot.py b/warp_beacon/telegram/bot.py index 3afb4d9..7ab54ee 100644 --- a/warp_beacon/telegram/bot.py +++ b/warp_beacon/telegram/bot.py @@ -170,6 +170,7 @@ def build_tg_args(self, job: UploadJob) -> dict: ) else: args["video"] = job.tg_file_id.replace(":video", '') + args["caption"] = self.build_signature_caption(job) else: if job.placeholder_message_id: args["media"] = InputMediaVideo( @@ -200,6 +201,7 @@ def build_tg_args(self, job: UploadJob) -> dict: ) else: args["photo"] = job.tg_file_id.replace(":image", '') + args["caption"] = self.build_signature_caption(job) else: if job.placeholder_message_id: args["media"] = InputMediaPhoto( @@ -208,6 +210,7 @@ def build_tg_args(self, job: UploadJob) -> dict: ) else: args["photo"] = job.local_media_path + args["caption"] = self.build_signature_caption(job) elif job.media_type == JobType.AUDIO: if job.tg_file_id: if job.placeholder_message_id: @@ -239,10 +242,12 @@ def build_tg_args(self, job: UploadJob) -> dict: if job.tg_file_id: if job.placeholder_message_id: args["media"] = InputMediaAnimation( - media=job.tg_file_id.replace(":animation", '') + media=job.tg_file_id.replace(":animation", ''), + caption=self.build_signature_caption(job) ) else: args["animation"] = job.tg_file_id.replace(":animation", '') + args["caption"] = self.build_signature_caption(job) else: if job.placeholder_message_id: args["media"] = InputMediaAnimation(