-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.py
81 lines (57 loc) · 1.97 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import json
import logging
import pickle
import time
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
from datetime import datetime
from os.path import dirname
import requests
import schedule
import sentry_sdk
from flask import render_template
from redis import StrictRedis
from requests import Session
from sentry_sdk import capture_exception, push_scope
from tqdm import tqdm
from config import config
from main import app
from sources import sources
if 'SENTRY_DSN' in config:
sentry_sdk.init(dsn=config['SENTRY_DSN'])
EXECUTOR = PoolExecutor()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
HERE = dirname(__file__)
redis = StrictRedis.from_url(config["REDIS_URL"])
def update_data():
start = time.time()
logging.info("Starting data update")
data, errors = get_data()
statuses = {source: not error for source, error in errors.items()}
data = pickle.dumps(data)
statuses = json.dumps(statuses)
redis.pipeline().set("data", data).set('statuses', statuses).execute()
for source, error in errors.items():
if not error:
continue
with push_scope() as scope:
scope.set_tag('source', source)
capture_exception(error, scope)
redis.set('last_updated', datetime.now().isoformat())
logging.info("Update took %s seconds", time.time() - start)
def reliable(errors):
# call with list() to ensure they start in parallel
fs = [EXECUTOR.submit(s, Session()) for s in sources]
for source, future in zip(sources, fs):
try:
yield from future.result()
except Exception as e:
logging.exception('failed to retrieve data for %s', source.__name__)
errors[source.__name__] = e
def get_data():
errors = {src.__name__: None for src in sources}
data = reliable(errors)
data = sorted(tqdm(data), key=lambda item: item.found_on, reverse=True)
return data, errors
if __name__ == "__main__":
update_data()