-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.py
123 lines (102 loc) · 3.37 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
import re
import sys
import unicodedata
from threading import Thread
if sys.version_info[0] == 3:
from urllib import request as urllib
else:
import urllib2 as urllib
def platten_nested_list(l):
ret = []
for v in l:
if isinstance(v, list):
if len(v) == 1:
if isinstance(v[0], list):
ret.append(platten_nested_list(v[0]))
else:
ret.append(v[0])
elif len(v) > 1:
ret.append(platten_nested_list(v))
else:
ret.append(v)
return ret
def run_threads(func, args_list):
results = {}
procs = []
for i, args in enumerate(args_list):
proc = Thread(
target=lambda idx, args: results.setdefault(idx, func(*args)),
args=(i, args))
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
return [results[i] for i in range(len(args_list))]
def make_depth_two(l):
ret = []
for v in l:
if isinstance(v, list):
if len(v) > 0:
if not isinstance(v[0], list):
ret.append(v)
else:
ret.extend(make_depth_two(v))
else:
ret.append([v])
return ret
def build_dic(l):
appeared = set()
ret = []
for v in l:
if v[0] in appeared:
continue
appeared.add(v[0])
dic = dict(label=v[0],
name=v[1],
market=v[2],
code=v[4])
ret.append(dic)
return ret
def data_to_dic(data):
return build_dic(make_depth_two(platten_nested_list(data)))
def get_json(url):
headers = {
'authority': 'ac.finance.naver.com',
'cache-control': 'max-age=0',
'save-data': 'on',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'en-US,en;q=0.9,ko;q=0.8',
'cookie': 'NNB=CGZMQERKMOMVY; npic=kwUMpAjWeZW05ke3zXghQaDMHe2qBml8aNjihV38fBbO7Mu+2Z2oDQPMO13W6kfpCA==; nx_ssl=2; naver_stock_codeList=041190%7C',
}
request = urllib.Request(url, headers=headers)
response = urllib.urlopen(request)
content = response.read()
try:
data = json.loads(content.decode('utf-8'))
except Exception:
data = json.loads(content.decode('euc-kr'))
return data
def parse_url(url):
# p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*)(?P<uri>.*)'
p = '((?P<schema>https?)://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*)(?P<uri>[^\?]*)\??(?P<params>.*)'
m = re.search(p, str(url))
schema = m.group('schema')
host = m.group('host')
port = m.group('port')
uri = m.group('uri')
params = m.group('params')
return schema, host, port, uri, params
def format_num(num, decimal=0):
return ('{:,.%sf}' % decimal).format(float(num))
def encode(query):
return urllib.quote(unicodedata.normalize('NFC', query).encode('euc-kr'))
def get_query(argv):
try:
query = u'%s' % ' '.join(argv)
return encode(query)
except Exception:
return ''