-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_program.py
executable file
·445 lines (366 loc) · 17.7 KB
/
run_program.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
import logging
import time
from initialization import initialize
from settings import get_setting, get_all_settings
from content_checkers.overseerr import get_wanted_from_overseerr
from content_checkers.collected import get_wanted_from_collected
from content_checkers.trakt import get_wanted_from_trakt_lists, get_wanted_from_trakt_watchlist
from metadata.metadata import process_metadata, refresh_release_dates
from content_checkers.mdb_list import get_wanted_from_mdblists
from database import add_collected_items, add_wanted_items
from not_wanted_magnets import task_purge_not_wanted_magnets_file
import traceback
from datetime import datetime, timedelta
from database import get_db_connection
import asyncio
from utilities.plex_functions import run_get_collected_from_plex, run_get_recent_from_plex
queue_logger = logging.getLogger('queue_logger')
program_runner = None
class ProgramRunner:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(ProgramRunner, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.running = False
from queue_manager import QueueManager
self.queue_manager = QueueManager()
self.tick_counter = 0
self.task_intervals = {
'wanted': 5,
'scraping': 5,
'adding': 5,
'checking': 300, #300
'sleeping': 900,
'unreleased': 3600,
'blacklisted': 3600,
'pending_uncached': 3600,
'task_plex_full_scan': 3600,
'task_debug_log': 60,
'task_refresh_release_dates': 3600,
'task_purge_not_wanted_magnets_file': 604800,
'task_generate_airtime_report': 3600,
}
self.start_time = time.time()
self.last_run_times = {task: self.start_time for task in self.task_intervals}
self.enabled_tasks = {
'wanted',
'scraping',
'adding',
'checking',
'sleeping',
'unreleased',
'blacklisted',
'pending_uncached',
'task_plex_full_scan',
'task_debug_log',
'task_refresh_release_dates',
'task_generate_airtime_report'
}
# Add this line to store content sources
self.content_sources = None
# Modify this method to cache content sources
def get_content_sources(self, force_refresh=False):
if self.content_sources is None or force_refresh:
settings = get_all_settings()
self.content_sources = settings.get('Content Sources', {})
debug_settings = settings.get('Debug', {})
custom_check_periods = debug_settings.get('content_source_check_period', {})
default_intervals = {
'Overseerr': 900,
'MDBList': 900,
'Collected': 86400,
'Trakt Watchlist': 900,
'Trakt Lists': 900
}
for source, data in self.content_sources.items():
if isinstance(data, str):
data = {'enabled': data.lower() == 'true'}
if not isinstance(data, dict):
logging.error(f"Unexpected data type for content source {source}: {type(data)}")
continue
source_type = source.split('_')[0]
# Use custom check period if present, otherwise use default
custom_interval = custom_check_periods.get(source)
if custom_interval is not None:
data['interval'] = int(custom_interval) * 60 # Convert minutes to seconds
else:
data['interval'] = int(data.get('interval', default_intervals.get(source_type, 3600)))
task_name = f'task_{source}_wanted'
self.task_intervals[task_name] = data['interval']
self.last_run_times[task_name] = self.start_time
if isinstance(data.get('enabled'), str):
data['enabled'] = data['enabled'].lower() == 'true'
if data.get('enabled', False):
self.enabled_tasks.add(task_name)
# Log the intervals only once
logging.info("Content source intervals:")
for task, interval in self.task_intervals.items():
if task.startswith('task_') and task.endswith('_wanted'):
logging.info(f"{task}: {interval} seconds")
return self.content_sources
def should_run_task(self, task_name):
if task_name not in self.enabled_tasks:
return False
current_time = time.time()
if current_time - self.last_run_times[task_name] >= self.task_intervals[task_name]:
self.last_run_times[task_name] = current_time
return True
return False
# Update this method to use the cached content sources
def process_queues(self):
self.queue_manager.update_all_queues()
for queue_name in ['wanted', 'scraping', 'adding', 'checking', 'sleeping', 'unreleased', 'blacklisted', 'pending_uncached']:
if self.should_run_task(queue_name):
self.safe_process_queue(queue_name)
if self.should_run_task('task_plex_full_scan'):
self.task_plex_full_scan()
if self.should_run_task('task_refresh_release_dates'):
self.task_refresh_release_dates()
if self.should_run_task('task_debug_log'):
self.task_debug_log()
# Process content source tasks
for source, data in self.get_content_sources().items():
task_name = f'task_{source}_wanted'
if self.should_run_task(task_name):
self.process_content_source(source, data)
def safe_process_queue(self, queue_name: str):
try:
logging.debug(f"Starting to process {queue_name} queue")
# Get the appropriate process method
process_method = getattr(self.queue_manager, f'process_{queue_name}')
# Call the process method and capture any return value
result = process_method()
# Return the result if any
return result
except AttributeError as e:
logging.error(f"Error: No process method found for {queue_name} queue. Error: {str(e)}")
except Exception as e:
logging.error(f"Error processing {queue_name} queue: {str(e)}")
logging.error(f"Traceback: {traceback.format_exc()}")
return None
def task_plex_full_scan(self):
get_and_add_all_collected_from_plex()
def process_content_source(self, source, data):
source_type = source.split('_')[0]
versions = data.get('versions', {})
logging.debug(f"Processing content source: {source} (type: {source_type})")
wanted_content = []
if source_type == 'Overseerr':
wanted_content = get_wanted_from_overseerr()
elif source_type == 'MDBList':
mdblist_urls = data.get('urls', '').split(',')
for mdblist_url in mdblist_urls:
mdblist_url = mdblist_url.strip()
wanted_content.extend(get_wanted_from_mdblists(mdblist_url, versions))
elif source_type == 'Trakt Watchlist':
wanted_content = get_wanted_from_trakt_watchlist()
elif source_type == 'Trakt Lists':
trakt_lists = data.get('trakt_lists', '').split(',')
for trakt_list in trakt_lists:
trakt_list = trakt_list.strip()
wanted_content.extend(get_wanted_from_trakt_lists(trakt_list, versions))
elif source_type == 'Collected':
wanted_content = get_wanted_from_collected()
else:
logging.warning(f"Unknown source type: {source_type}")
return
logging.debug(f"Retrieved wanted content from {source}: {len(wanted_content)} items")
if wanted_content:
total_items = 0
if isinstance(wanted_content, list) and len(wanted_content) > 0 and isinstance(wanted_content[0], tuple):
# Handle list of tuples
for items, item_versions in wanted_content:
processed_items = process_metadata(items)
if processed_items:
all_items = processed_items.get('movies', []) + processed_items.get('episodes', [])
add_wanted_items(all_items, item_versions or versions)
total_items += len(all_items)
else:
# Handle single list of items
processed_items = process_metadata(wanted_content)
if processed_items:
all_items = processed_items.get('movies', []) + processed_items.get('episodes', [])
add_wanted_items(all_items, versions)
total_items += len(all_items)
logging.info(f"Added {total_items} wanted items from {source}")
else:
logging.warning(f"No wanted content retrieved from {source}")
def get_wanted_content(self, source_type, data):
versions = data.get('versions', {})
logging.debug(f"Getting wanted content for {source_type} with versions: {versions}")
if source_type == 'Overseerr':
content = get_wanted_from_overseerr()
return [(content, versions)] if content else []
elif source_type == 'MDBList':
mdblist_urls = data.get('urls', '').split(',')
result = []
for url in mdblist_urls:
content = get_wanted_from_mdblists(url.strip(), versions)
if isinstance(content, list) and len(content) > 0 and isinstance(content[0], tuple):
result.extend(content)
else:
result.append((content, versions))
return result
elif source_type == 'Collected':
content = get_wanted_from_collected()
return [(content, versions)] if content else []
elif source_type == 'Trakt Watchlist':
content = get_wanted_from_trakt_watchlist()
return [(content, versions)] if content else []
elif source_type == 'Trakt Lists':
trakt_lists = data.get('trakt_lists', '').split(',')
result = []
for url in trakt_lists:
content = get_wanted_from_trakt_lists(url.strip(), versions)
if isinstance(content, list) and len(content) > 0 and isinstance(content[0], tuple):
result.extend(content)
else:
result.append((content, versions))
return result
else:
logging.warning(f"Unknown source type: {source_type}")
return []
def task_refresh_release_dates(self):
refresh_release_dates()
def task_refresh_release_dates(self):
task_purge_not_wanted_magnets_file()
def task_generate_airtime_report(self):
generate_airtime_report()
def task_debug_log(self):
current_time = time.time()
debug_info = []
for task, interval in self.task_intervals.items():
if interval > 60: # Only log tasks that run less frequently than every minute
time_until_next_run = interval - (current_time - self.last_run_times[task])
minutes, seconds = divmod(int(time_until_next_run), 60)
hours, minutes = divmod(minutes, 60)
debug_info.append(f"{task}: {hours:02d}:{minutes:02d}:{seconds:02d}")
logging.info("Time until next task run:\n" + "\n".join(debug_info))
def run_initialization(self):
logging.info("Running initialization...")
skip_initial_plex_update = get_setting('Debug', 'skip_initial_plex_update', False)
disable_initialization = get_setting('Debug', 'disable_initialization', '')
if not disable_initialization:
initialize(skip_initial_plex_update)
logging.info("Initialization complete")
def start(self):
if not self.running:
self.running = True
self.run()
def stop(self):
self.running = False
def is_running(self):
return self.running
def run(self):
self.run_initialization()
while self.running:
self.process_queues()
time.sleep(1) # Main loop runs every second
def invalidate_content_sources_cache(self):
self.content_sources = None
def process_overseerr_webhook(data):
notification_type = data.get('notification_type')
if notification_type == 'TEST_NOTIFICATION':
logging.info("Received test notification from Overseerr")
return
media = data.get('media')
if not media:
logging.warning("Received webhook without media information")
return
media_type = media.get('media_type')
tmdb_id = media.get('tmdbId')
if not media_type or not tmdb_id:
logging.error("Invalid webhook data: missing media_type or tmdbId")
return
wanted_item = {
'tmdb_id': tmdb_id,
'media_type': media_type
}
wanted_content = [wanted_item]
wanted_content_processed = process_metadata(wanted_content)
if wanted_content_processed:
# Get the versions for Overseerr from settings
content_sources = ProgramRunner().get_content_sources(force_refresh=True)
overseerr_settings = next((data for source, data in content_sources.items() if source.startswith('Overseerr')), {})
versions = overseerr_settings.get('versions', {})
all_items = wanted_content_processed.get('movies', []) + wanted_content_processed.get('episodes', []) + wanted_content_processed.get('anime', [])
add_wanted_items(all_items, versions)
logging.info(f"Processed and added wanted item from webhook: {wanted_item}")
def generate_airtime_report():
logging.info("Generating airtime report for wanted and unreleased items...")
conn = get_db_connection()
cursor = conn.cursor()
# Fetch all wanted and unreleased items
cursor.execute("""
SELECT id, title, type, release_date, airtime, state
FROM media_items
WHERE state IN ('Wanted', 'Unreleased')
ORDER BY release_date, airtime
""")
items = cursor.fetchall()
current_datetime = datetime.now()
report = []
logging.info(f"Movie airtime offset: {get_setting('Queue', 'movie_airtime_offset', '19')}")
logging.info(f"Episode airtime offset: {get_setting('Queue', 'episode_airtime_offset', '0')}")
for item in items:
item_id, title, item_type, release_date, airtime, state = item
if not release_date or release_date.lower() == "unknown":
report.append(f"{title} ({item_type}): Unknown release date")
continue
try:
release_date = datetime.strptime(release_date, '%Y-%m-%d').date()
except ValueError:
report.append(f"{title} ({item_type}): Invalid release date format")
continue
if item_type == 'movie':
airtime_offset = (float(get_setting("Queue", "movie_airtime_offset", "19"))*60)
airtime = datetime.strptime("00:00", '%H:%M').time()
elif item_type == 'episode':
airtime_offset = (float(get_setting("Queue", "episode_airtime_offset", "0"))*60)
airtime = datetime.strptime(airtime or "00:00", '%H:%M').time()
else:
airtime_offset = 0
airtime = datetime.now().time()
release_datetime = datetime.combine(release_date, airtime)
scrape_datetime = release_datetime + timedelta(minutes=airtime_offset)
time_until_scrape = scrape_datetime - current_datetime
if time_until_scrape > timedelta(0):
report.append(f"{title} ({item_type}): Start scraping at {scrape_datetime}, in {time_until_scrape}")
else:
report.append(f"{title} ({item_type}): Ready to scrape (Release date: {release_date}, Current state: {state})")
conn.close()
# Log the report
logging.info("Airtime Report:\n" + "\n".join(report))
def get_and_add_all_collected_from_plex():
logging.info("Getting all collected content from Plex")
collected_content = asyncio.run(run_get_collected_from_plex())
if collected_content:
logging.info(f"Retrieved {len(collected_content['movies'])} movies and {len(collected_content['episodes'])} episodes from Plex")
add_collected_items(collected_content['movies'] + collected_content['episodes'])
else:
logging.error("Failed to retrieve content from Plex")
def get_and_add_recent_collected_from_plex():
logging.info("Getting recently added content from Plex")
collected_content = asyncio.run(run_get_recent_from_plex())
if collected_content:
logging.info(f"Retrieved {len(collected_content['movies'])} movies and {len(collected_content['episodes'])} episodes from Plex")
add_collected_items(collected_content['movies'] + collected_content['episodes'], recent=True)
else:
logging.error("Failed to retrieve content from Plex")
def run_program():
global program_runner
logging.info("Program started")
if program_runner is None or not program_runner.is_running():
program_runner = ProgramRunner()
#program_runner.start() # This will now run the main loop directly
else:
logging.info("Program is already running")
return program_runner
if __name__ == "__main__":
run_program()