-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
114 lines (87 loc) · 3.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import datetime
import time
import json
import smtplib
try:
from urllib.request import urlopen, Request
except ImportError:
from urllib2 import urlopen, Request
def get_facebook_page_url(base_url, extras=''):
# Construct the URL string; see http://stackoverflow.com/a/37239851 for
# Reactions parameters
fields = ("&fields=message,link,created_time,type,name,id," +
"comments.limit(0).summary(true),shares,reactions" +
f".limit(0).summary(true){extras}")
return base_url + fields
def get_page_response(url, retry_lim=10):
req = Request(url)
for i in range(retry_lim):
try:
response = urlopen(req)
if response.getcode() == 200:
return response.read()
except Exception as e:
print(f"{datetime.datetime.now()} - Error for URL {url}\n" +
f"{e}\nRetrying{i+1}/{retry_lim}")
time.sleep(5)
def get_reactions_from_status(base_url):
reaction_types = ['like', 'love', 'wow', 'haha', 'sad', 'angry']
reactions_dict = {}
for reaction_type in reaction_types:
fields = (f"&fields=reactions.type({reaction_type.upper()})" +
".limit(0).summary(total_count)")
url = base_url + fields
response_url = get_page_response(url)
if not response_url:
continue
data = json.loads(response_url)['data']
data_processed = set()
for status in data:
r_id = status['id']
r_count = status['reactions']['summary']['total_count']
data_processed.add((r_id, r_count))
for r_id, r_count in data_processed:
reactions_dict[r_id] = reactions_dict[r_id] + \
(r_count,) if r_id in reactions_dict else (r_count, )
return reactions_dict
def get_data_from_status(status):
# The status is now a Python dictionary, so for top-level items,
# we can simply call the key.
# Additionally, some items may not always exist,
# so must check for existence first
status_id = status['id']
status_type = status['type']
status_message = status.get('message', '')
link_name = status.get('name', '')
status_link = status.get('link', '')
# Time needs special care since a) it's in UTC and
# b) it's not easy to use in statistical programs.
status_published = datetime.datetime.strptime(
status['created_time'], '%Y-%m-%dT%H:%M:%S+0000')
status_published = status_published + \
datetime.timedelta(hours=+9) # tokyo time
status_published = status_published.strftime('%Y-%m-%d %H:%M:%S')
status_author = '' if 'from' not in status else status['from']['name']
# Nested items require chaining dictionary keys.
num_reactions = (0 if 'reactions' not in status else
status['reactions']['summary']['total_count'])
num_comments = (0 if 'comments' not in status else
status['comments']['summary']['total_count'])
num_shares = 0 if 'shares' not in status else status['shares']['count']
return (status_id, status_message, status_author, link_name, status_type,
status_link, status_published, num_reactions, num_comments, num_shares)
def send_email(username, password, target, body, header='Your script is done!'):
try:
with smtplib.SMTP('smtp.gmail.com', 587) as s:
s.ehlo()
s.starttls()
s.ehlo()
s.login(username, password)
msg = f'Subject: {header}\n\n{body}'
s.sendmail(username,
target, msg)
except Exception as e:
print('Failed to send a notification e-mail. ')
print(e)
else:
print('Email sent.')