-
Notifications
You must be signed in to change notification settings - Fork 0
/
helpers.py
100 lines (77 loc) · 2.74 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import requests
from time import sleep
import pandas as pd
import re
import time
from tqdm.auto import tqdm
import argparse
from collections.abc import MutableMapping
from filecache import filecache
import os
def file_exists(pattern):
"""Check if a file already exists."""
files = os.listdir()
return any([pattern in fn for fn in files])
@filecache
def petition_slug_to_id(slug):
"""Obtain a petition ID for a petition slug."""
data = requests.get(f'https://www.change.org/p/{slug}/c')
html = data.text
assert data.ok, data.text
regexps = ['"petition":{"id":"([0-9]+)"', 'data-petition_id="([0-9]+)"']
id_ = None
for regexp in regexps:
try:
id_ = int(re.search(regexp, html).groups()[0])
except:
print(f"Regexp {regexp} failed")
if id_ is None:
raise ValueError(f"Can't obtain petition ID for slug {slug}")
print(f"Petition slug {slug} resolved to id {id_}")
return id_
def flatten_dict(dct, delimeter="__"):
"""Make a flat dictionary out of a nested one."""
def flatten_dict_g(dct, delimeter=delimeter):
assert isinstance(dct, dict), dct
for key, val in dct.items():
if isinstance(val, dict):
for subkey, subval in flatten_dict_g(val):
yield key + delimeter + subkey, subval
else:
yield key, val
return dict(list(flatten_dict_g(dct)))
def limit(g, do_tqdm=True, nmax=10):
"""Limit entries from a generator."""
for i, item in tqdm(enumerate(g), total=nmax):
if i >= nmax:
break
yield item
def request_generator(f, f_delay=None, offset=0, batch_size=10, **kwargs):
"""Call f repeatedly with increasing offset, then call f_delay."""
offset_current = offset
while True:
try:
data = f(offset=offset_current, limit=batch_size, **kwargs)
except Exception as e:
print(f'Download failed at offset {offset_current}')
print(e)
raise e
if not len(data):
break
for elem in data:
yield elem
offset_current += len(data)
if f_delay is not None:
f_delay()
def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str ='.') -> MutableMapping:
"""Flatten a dictionary.
Taken from https://www.freecodecamp.org/news/how-to-flatten-a-dictionary-in-python-in-4-different-ways/
"""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)