-
Notifications
You must be signed in to change notification settings - Fork 0
/
collect_data_webcrawl.py
290 lines (210 loc) · 11.3 KB
/
collect_data_webcrawl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import re
import os
import json
import requests
import pandas as pd
from tqdm import tqdm
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter, Retry
ENZYME_TYPES = ['protease','amylase','lipase','mannanase','cellulase', 'pectinase', 'others']
def crawl_brenda_ecnumber_table(url,save_path='ecnumber_brenda.csv'):
# set headers
brenda_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
}
# get web
response = requests.get(url, headers=brenda_headers)
response.raise_for_status() # check if successful
# parse html
soup = BeautifulSoup(response.content, 'html.parser')
# get <table>
table = soup.find('table')
# get <table> rows
rows = table.find_all('tr')
# get header of table
headers = [header.get_text(strip=True) for header in rows[0].find_all('th')] + ['Long details', 'Short details']
headers.remove('Show details')
base_url = 'https://www.brenda-enzymes.org'
# get data in table
data = []
for row in rows[1:]:
cells = row.find_all('td')
row_data = []
for cell in cells:
# find links in <a>
links = cell.find_all('a')
links_new = []
if links:
long_url, short_url = [
base_url + (href.get('href').replace('.', '', 1) if href.get('href')[0] == '.' else href.get('href'))
for href in links
]
row_data.extend([long_url, short_url])
else:
row_data.append(cell.get_text(strip=True))
data.append(row_data)
# transfer to pandas series
df = pd.DataFrame(data, columns=headers)
print(df.head())
# save as csv file
df.to_csv(save_path, index=False)
print(f'save ecnumber from brenda to {save_path}')
def crawl_brenda_table_by_ec(ec_num, organism, table='temperature'):
pH_options = ['pH_Optimum', 'pH_Range', 'pH_Stability']
pH_num = [45,46,47]
temperature_options = ['Temperature_Optimum', 'Temperature_Range', 'Temperature_Stability']
temperature_num = [41,42,43]
tables = pH_options if table == 'pH' else temperature_options
table_num = pH_num if table == 'pH' else temperature_num
data = {}
for table in tables:
url = f'https://www.brenda-enzymes.org/all_enzymes.php?ecno={ec_num}&table={table}#TAB'
# set headers
brenda_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
}
# get web
response = requests.get(url, headers=brenda_headers)
response.raise_for_status() # check if successful
# parse html
soup = BeautifulSoup(response.content, 'html.parser')
rows = soup.find(id=f'tab{table_num[tables.index(table)]}')
if rows == None: # 2024/08/03 LUN check if information exists
continue
for row in rows.find_all('div'):
cells = row.find_all('div', class_='cell')
if len(cells) > 1 and cells[1].text.strip() in organism:
value = cells[0].text.strip() # 提取第一個單元格中的值
data[table] = value
break
return data
re_next_link = re.compile(r'<(.+)>; rel="next"')
retries = Retry(total=5, backoff_factor=0.25, status_forcelist=[500, 502, 503, 504])
session = requests.Session()
session.mount("https://", HTTPAdapter(max_retries=retries))
def get_next_link(headers):
if "Link" in headers:
match = re_next_link.match(headers["Link"])
if match:
return match.group(1)
def get_batch(batch_url):
while batch_url:
response = session.get(batch_url)
response.raise_for_status()
total = response.headers["x-total-results"]
yield response, total
batch_url = get_next_link(response.headers)
def crawl_uniprot_by_ecnumber(ec_num, enzyme_type='protease', download=True,
save_path = '', file_name='',
fields = ['accession','id', 'protein_name', 'gene_names', 'organism_name', 'length', 'cc_function', 'cc_biotechnology','cc_interaction'],
headers = ['Entry', 'Entry Name', 'Protein names', 'Gene Names', 'Organism', 'Length', 'Function [CC]', 'Biotechnological use', 'Interacts with'],
return_format = 'tsv',
reviewed = 'true'):
# url = 'https://rest.uniprot.org/uniprotkb/search?fields=accession%2Ccc_interaction&format=tsv&query=(ec:1.1.1.44)%20AND%20%28reviewed%3Atrue%29&size=500'
# url = 'https://rest.uniprot.org/uniprotkb/search?fields=comment_count,feature_count,length,structure_3d,annotation_score,protein_existence,lit_pubmed_id,accession,organism_name,protein_name,gene_names,reviewed,keyword,id%2Ccc_interaction&format=tsv&query=(ec:1.1.1.44)%20AND%20(reviewed%3Atrue)&size=500'
url = f'https://rest.uniprot.org/uniprotkb/search?fields={",".join(fields)}&format={return_format}&query=(ec:{ec_num})%20AND%20(reviewed%3A{reviewed})&size=500'
data = []
if return_format == 'tsv':
if get_batch(url) != None:
for batch, total in get_batch(url):
for line in tqdm(batch.text.splitlines()[1:]):
data.append(line.split('\t'))
df = pd.DataFrame(data, columns=headers)
ec_num_str = '_'.join(ec_num.split('.'))
reviewed_num = 0 if reviewed=='false' else 1
file_name = f'{enzyme_type}_ec{ec_num_str}_reviewed{reviewed_num}_{len(df)}.csv' if file_name=='' else file_name
save_path += file_name
if download and len(data)>0:
df.to_csv(save_path, index=False)
print(f'Save file for ec:{ec_num} with reviewed={reviewed} to path: {save_path}')
else:
if get_batch(url) != None:
for batch, total in get_batch(url):
for line in tqdm(batch.json()['results']):
data.append(line)
ec_num_str = '_'.join(ec_num.split('.'))
reviewed_num = 0 if reviewed=='false' else 1
file_name = f'{enzyme_type}_ec{ec_num_str}_reviewed{reviewed_num}_{len(df)}.csv' if file_name=='' else file_name
save_path += file_name
if download and len(data)>0:
with open(save_path, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, ensure_ascii=False, indent=4)
print(f'Save file for ec:{ec_num} with reviewed={reviewed} to path: {save_path}')
return data
def collate_target_enzyme():
def contains_english_chars(input_str):
# Regular expression to match English letters
pattern = re.compile('[a-zA-Z]')
# Check if the input string contains English letters
return bool(pattern.search(input_str))
all_data = pd.read_csv('data/ecnumber_brenda.csv')
if all_data.empty == False:
for index, row in all_data.iterrows():
for enzyme in ENZYME_TYPES:
if enzyme in row['Recommended Name'].lower():
if contains_english_chars(row['EC Number']) == False:
crawl_uniprot_by_ecnumber(ec_num=row['EC Number'], enzyme_type=enzyme,reviewed='true',save_path=f'data/brenda/{enzyme}/reviewed/')
crawl_uniprot_by_ecnumber(ec_num=row['EC Number'], enzyme_type=enzyme,reviewed='false',save_path=f'data/brenda/{enzyme}/unreviewed/')
else:
print("There's no ec number data to refer for digestive enzyme!")
def crawl_uniprot_by_query(query, download=True,
save_path = '', file_name='',
fields = ['ec','accession','id', 'protein_name', 'gene_names', 'organism_name', 'length', 'cc_function', 'cc_biotechnology','cc_interaction'],
headers = ['EC number', 'Entry', 'Entry Name', 'Protein names', 'Gene Names', 'Organism', 'Length', 'Function [CC]', 'Biotechnological use', 'Interacts with'],
return_format = 'tsv',
reviewed = 'true'):
url = f'https://rest.uniprot.org/uniprotkb/search?fields={",".join(fields)}&format={return_format}&query=({query})%20AND%20(reviewed%3A{reviewed})&size=500'
data = []
if(return_format == 'tsv' ):
if get_batch(url) != None:
for batch, total in get_batch(url):
for line in batch.text.splitlines()[1:]:
data.append(line.split('\t')) # LUN 2024/07/30 check if there's blank in string
df = pd.DataFrame(data, columns=headers)
reviewed_num = 0 if reviewed=='false' else 1
file_name = f'{query}_reviewed{reviewed_num}_{len(df)}.csv' if file_name=='' else file_name
save_path = os.path.join(save_path, file_name)
if download and len(data)>0:
df.to_csv(save_path, index=False)
# print(f'Save file for query:{query} with reviewed={reviewed} to path: {save_path}')
else:
if get_batch(url) != None:
for batch, total in get_batch(url):
for line in batch.json()['results']:
data.append(line)
reviewed_num = 0 if reviewed=='false' else 1
file_name = f'{query}_reviewed{reviewed_num}_{len(data)}.json' if file_name=='' else file_name
save_path = os.path.join(save_path, file_name)
if download and len(data)>0:
with open(save_path, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, ensure_ascii=False, indent=4)
# print(f'Save file for query:{query} with reviewed={reviewed} to path: {save_path}')
return data
def download_fasta(query, save_path='', file_name='', compressed='false', reviewed='true',):
url = f'https://rest.uniprot.org/uniprotkb/stream?compressed={compressed}&format=fasta&query=({query})%20AND%20(reviewed%3A{reviewed})'
response = requests.get(url)
if response.status_code != 200:
print(f"Error: Unable to fetch data. HTTP Status Code: {response.status_code}")
all_fastas = response.text
fasta_list = re.split(r'\n(?=>)', all_fastas)
reviewed_num = 0 if reviewed=='false' else 1
file_name = f'{query}_reviewed{reviewed_num}_{len(fasta_list)}.fasta' if file_name=='' else file_name
save_path = os.path.join(save_path, file_name)
def save_to_fasta(sequences, filename):
with open(filename, 'w') as f:
for seq in sequences:
f.write(f'{seq}\n')
# print(f'Save .fasta file for query:{query} with reviewed={reviewed} to path: {save_path}')
save_to_fasta(fasta_list, save_path)
if __name__ == '__main__':
for enzyme_type in ENZYME_TYPES:
crawl_uniprot_by_query(query=enzyme_type, reviewed='true',save_path='data/uniprot/reviewed/')
# the amount of unreviewed data is too large!
for enzyme_type in ENZYME_TYPES:
crawl_uniprot_by_query(query=enzyme_type, reviewed='false',save_path='data/uniprot/unreviewed/')