forked from cwilliams5/claim-free-steam-packages
-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_package_list.py
115 lines (91 loc) · 2.89 KB
/
get_package_list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import contextlib
import random
from datetime import timedelta
import joblib
import requests
from joblib import Parallel, delayed
from requests_cache import CachedSession
from tqdm import tqdm
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
"""Context manager to patch joblib to report into tqdm progress bar given as argument"""
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
def checkGame(game):
session = CachedSession(
'steam_cache',
use_cache_dir=True,
cache_control=False,
allowable_methods=['GET'],
allowable_codes=[200],
match_headers=False,
stale_if_error=False,
)
try:
res = session.get(
url='https://store.steampowered.com/api/appdetails/?appids=' +
str(game) + '&cc=EE&l=english&v=1',
proxies={
'http': 'socks5h://p.webshare.io:9999',
'https': 'socks5h://p.webshare.io:9999'
},
expire_after=timedelta(hours=random.randint(20, 24)))
except Exception as e:
print('\nGot exception while trying to send request %s' % e)
return checkGame(game)
if res.status_code != 200:
print('\nGot wrong status code %d' % res.status_code)
return checkGame(game)
try:
res = res.json()
except Exception as e:
# print('\nGot exception while trying to decode JSON %s' % e)
return None
if res is None:
print('\nGot invalid response %d' % game)
return checkGame(game)
if res[str(game)]['success'] is False:
# print('\nGot invalid app %d' % game)
return None
if res[str(game)]['data']['release_date']['coming_soon'] is True:
# print('\nGot not yet released app %d' % game)
return None
if res[str(game)]['data']['is_free'] is True:
# print('\nGot free app %d' % game)
return game
if res[str(game)]['data']['is_free'] is False:
# print('\nGot paid app %d' % game)
return None
print('Got faulty response %s' % res)
return None
res = requests.get(
url='http://api.steampowered.com/ISteamApps/GetAppList/v2').json()
apps = []
for app in res['applist']['apps']:
apps.append(app['appid'])
random.shuffle(apps)
print('Received %d apps' % len(apps))
with tqdm_joblib(
tqdm(desc='Requesting app statuses', total=len(apps),
mininterval=10)) as progress_bar:
results = Parallel(n_jobs=10)(delayed(checkGame)(i) for i in apps)
output = ''
for game in results:
if game is not None:
output += str(game) + ','
output = output[:-1]
with open('package_list.txt', 'w') as f:
f.write(output)
if len(output) > 10000:
print("\nRan successfully")
else:
print("\nSomething went wrong")