-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
124 lines (106 loc) · 4.14 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import sys
import json
import asyncio
import aiohttp
import logging
from bs4 import BeautifulSoup, SoupStrainer
from parse import parse_content, parse_options
from utils import find_numerical, encode, decode, get_year
from NoSuchTagException import NoSuchTagException
# Setting up the logger with a custom format
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)
data = {}
skipped_pages = []
oldest_year_allowed = 2017
content_path, tag_path = ".qa-q-view-content", ".qa-q-view-tags"
"""
The issues:
1. Finding question along with images in the question
2. Finding the tags (Easiest)
3. Finding options and separating them from the question
4. Finding the correct answer (Manually)
"""
# For only parsing the div tags with given classes
# Source: https://beautiful-soup-4.readthedocs.io/en/latest/index.html#soupstrainer
div_tags = SoupStrainer(class_=["qa-q-view-content", "qa-q-view-tags"])
async def parse_page(url: str, html: str) -> None:
soup = BeautifulSoup(html, "lxml", parse_only=div_tags)
div = soup.select_one(content_path)
tags = [tag.get_text() for tag in soup.select_one(tag_path).select("li > a")]
year = None
for tag in tags:
if "gate" not in tag: continue
year = get_year(tag)
break
gzipped_html = encode(str(div))
# Ignoring descriptive and questions before `oldest_year_allowed` (Storing them for later)
if "descriptive" in tags or year is None or year < oldest_year_allowed:
skipped_pages.append({
"url": url,
"year": year,
"tags": tags,
"html.gz": gzipped_html
})
return
data[url] = {
"tags": tags,
"year": year,
# Incase some questions were malformed, use the raw content
# to manually fix the them later
"raw_gzip": gzipped_html
}
content = div.select_one(".qa-q-view-content > div:nth-child(2)")
# In the view content div, usually the last ol tag contains the options
last_ord_list = content.select("ol:nth-last-child(1)")
opts = None
if len(last_ord_list) != 0:
# Remove the options from the content
opts = await parse_options(last_ord_list[0].extract())
# If we didnt find any options and the question wasn't tagged as numerical
if opts is None and find_numerical(tags):
# Save it for later
skipped_pages.append({
"url": url,
"year": year,
"tags": tags,
"html.gz": gzipped_html
})
del data[url]
return
data[url]["options"] = opts
try:
data[url]["content"] = await parse_content(content)
except NoSuchTagException as tag_ex:
logging.info(f"Skipping {url} because of unrecognized HTML tag: {tag_ex.tag}")
# If we failed to parse the content due to some unknow tag then skip the page
skipped_pages.append({
"url": url,
"year": year,
"tags": tags,
"html.gz": gzipped_html
})
del data[url]
async def get_page(session: any, url: str) -> None:
# Not requesting html of questions that were asked before specified year
year = get_year(url)
if year is None or year < oldest_year_allowed: return
async with session.get(url) as response:
html = await response.text()
if response.status != 200:
logging.error(f"Response {response.status} while fetching {url}")
return
await parse_page(url, html)
async def main(FILE_NAME: str) -> None:
async with aiohttp.ClientSession() as session:
with open(FILE_NAME+".txt", "r") as fp:
tasks = [get_page(session, line.strip()) for line in fp if line.startswith("http")]
await asyncio.gather(*tasks)
logging.info(f"Found {len(data)} questions. Dumping to JSON")
json.dump(data, open(f"{FILE_NAME}.json", "w"), indent=4)
json.dump(skipped_pages, open(f"{FILE_NAME}.skipped.json", "w"), indent=4)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python main.py <file_name>")
exit(1)
file_name = sys.argv[1].replace(".txt", "")
asyncio.run(main(file_name))