-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSpiderMonitor.py
359 lines (313 loc) · 13.2 KB
/
SpiderMonitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# coding=utf-8
import requests
import json
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import re
import urllib.request
import datetime
import threading
def threaded(fun):
def wrapper(*args, **kwargs):
th = threading.Thread(target=fun, args=args, kwargs=kwargs, name=args[0].name)
th.start()
return th
return wrapper
def extend(fun):
def wrapper(spider, results, tasks, *args, **kwargs):
stime = datetime.datetime.now()
try:
ret = fun(spider, *args, **kwargs)
except Exception:
ret = spider.output(-1)
print(ret)
tasks[0] = tasks[0] -1
if(isinstance(ret, list)):
for result in ret:
results.append(result)
else:
results.append(ret)
print('\t' + spider.name + ' ended after %d seconds' % (datetime.datetime.now() - stime).seconds)
exit(0)
return wrapper
class RCSBCount:
def __init__(self, keyword: str, timeout):
super(RCSBCount, self).__init__()
self.keyword = keyword
self.name = 'rcsb'
self.timeout = timeout
@threaded
@extend
def get_info(self):
query_text = """
<orgPdbQuery>
<queryType>org.pdb.query.simple.AdvancedKeywordQuery</queryType>
<keywords>%s</keywords>
</orgPdbQuery>""" % self.keyword
data = query_text.encode('utf-8')
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
url = 'https://www.rcsb.org/pdb/rest/search'
self.search_url = 'http://www.rcsb.org/pdb/search/navbarsearch.do?f=&q=%s' % self.keyword
req = urllib.request.Request(url, data=data, headers=headers)
result = urllib.request.urlopen(req, timeout=self.timeout).read()
try:
self.count = str(result).count('\\n')
except Exception:
self.count = 0
return self.output(0)
def output(self, error):
if(not error):
return {
'title': 'RCSB',
'url': self.search_url,
'count': self.count,
}
else:
return {
'title': 'RCSB',
'url': self.search_url,
'count': error,
}
class iGEMPartsCount:
def __init__(self, keyword, timeout):
super(iGEMPartsCount, self).__init__()
self.keyword = keyword
self.name = 'iGEM parts'
self.timeout = timeout
@threaded
@extend
def get_info(self):
# 因为igem的query service(看起来是10年的一个参赛项目)已经挂掉了, 没办法用API获得模糊搜索的结果, 目前爬取的是原网站.
url = 'http://parts.igem.org/Special:Search?search=%s' % self.keyword
self.search_url = url
headers = {
'accept': 'text / html, application / xhtml + xml, application / xml; q = 0.9, image / webp, image / apng, * / *;q = 0.8'
}
req = urllib.request.Request(url, headers=headers)
html = urllib.request.urlopen(req, timeout=self.timeout).read()
soup = BeautifulSoup(html, 'lxml')
div = soup.find(attrs={'class': 'results-info'})
try:
self.count = int(div.find_all('strong')[-1].text)
except Exception:
self.count = 0
return self.output(0)
def output(self, error):
if(not error):
return {
'title': 'iGEM Parts',
'url': self.search_url,
'count': self.count,
}
else:
return {
'title': 'iGEM Parts',
'url': self.search_url,
'count': error,
}
class NLMCount:
def __init__(self, keyword, timeout):
super(NLMCount, self).__init__()
self.keyword = keyword
self.name = 'NLM'
self.timeout = timeout
@threaded
@extend
def get_info(self):
url = 'https://ghr.nlm.nih.gov/search?query=%s&show=xml&count=1' %self. keyword
xml = urllib.request.urlopen(url, timeout=self.timeout).read()
soup = BeautifulSoup(xml, 'lxml')
try:
self.count = int(soup.find('search_results').attrs['count'])
except Exception:
self.count = 0
return self.output(0)
def output(self, error):
if(not error):
return {
'title': 'NLM',
'url': 'https://ghr.nlm.nih.gov/search?query=%s' % self.keyword,
'count': self.count,
}
else:
return {
'title': 'NLM',
'url': 'https://ghr.nlm.nih.gov/search?query=%s' % self.keyword,
'count': error,
}
class NCBICount:
def __init__(self, term, timeout):
self.term = term
self.name = 'NCBI'
self.timeout = timeout
@threaded
@extend
def get_info(self): # find all databases which are concerning with the item
self.name_dic= {'gquery': 'All Databases', 'assembly': 'Assembly', 'biocollections': 'Biocollections', 'bioproject':
'BioProject', 'biosample': 'BioSample', 'biosystems': 'BioSystems', 'books': 'Books', 'clinvar': 'ClinVar',
'clone': 'Clone', 'cdd': 'Conserved Domains', 'gap': 'dbGaP', 'dbvar': 'dbVar', 'nucest': 'EST',
'gene': 'Gene', 'genome': 'Genome', 'gds': 'GEO DataSets', 'geoprofiles': 'GEO Profiles', 'nucgss':
'GSS', 'gtr': 'GTR', 'homologene': 'HomoloGene', 'ipg': 'Identical Protein Groups', 'medgen':
'MedGen', 'mesh': 'MeSH', 'ncbisearch': 'NCBI Web Site', 'nlmcatalog': 'NLM Catalog', 'nuccore':
'Nucleotide', 'omim': 'OMIM', 'pmc': 'PMC', 'popset': 'PopSet', 'probe': 'Probe', 'protein':
'Protein', 'proteinclusters': 'Protein Clusters', 'pcassay': 'PubChem BioAssay', 'pccompound':
'PubChem Compound', 'pcsubstance': 'PubChem Substance', 'pubmed': 'PubMed', 'pubmedhealth':
'PubMed Health', 'snp': 'SNP', 'sparcle': 'Sparcle', 'sra': 'SRA', 'structure': 'Structure',
'taxonomy': 'Taxonomy', 'toolkit': 'ToolKit', 'toolkitall': 'ToolKitAll', 'toolkitbookgh':
'ToolKitBookgh', 'unigene': 'UniGene'}
url_query = 'https://eutils.ncbi.nlm.nih.gov/gquery' + '?term=' + self.term + '&retmode=xml'
webdata = requests.get(url=url_query, timeout=self.timeout).text
soup = BeautifulSoup(webdata, 'lxml')
self.names = soup.select('dbname')
self.nums = soup.select('count')
return self.output(0)
def output(self, error):
results = []
if(not error):
result = {}
for name, num in zip(self.names, self.nums):
join_name = self.name_dic[name.get_text()]
result['title'] = join_name
try:
result['count'] = int(num.get_text())
except Exception:
result['count'] = 0
result['url'] = 'https://www.ncbi.nlm.nih.gov/' + name.get_text() + '/?term=' + self.term
temp_results = json.dumps(result)
join_result = json.loads(temp_results)
results.append(join_result)
result.clear()
return results
else:
for key in self.name_dic.keys():
results.append(
{
'title': self.name_dic[key],
'count': error,
'url': 'https://www.ncbi.nlm.nih.gov/' + key + '/?term=' + self.term
}
)
return results
class UniProtCount:
def __init__(self, keyword: str, timeout):
self.keyword = keyword
self.name = 'UniProt'
self.timeout = timeout
@threaded
@extend
def get_info(self):
query_string = urlencode({'query':self.keyword})
self.query_url = 'https://www.uniprot.org/uniprot/?%s&sort=score' % query_string
response = requests.get(self.query_url, timeout=self.timeout)
bs = BeautifulSoup(response.content, features='html.parser')
try:
self.count = int(re.findall("\d+",bs.find('div',class_='main-aside').find('script').text)[0])
except Exception:
self.count = 0
return self.output(0)
def output(self, error):
if(not error):
return {
'title': 'UniProt',
'url': self.query_url,
'count':self.count,
}
else:
return {
'title': 'UniProt',
'url': self.query_url,
'count':error,
}
class TaxonomyCount:
def __init__(self, keyword: str, timeout):
self.keyword = keyword
self.name = 'Taxonomy'
self.timeout = timeout
@threaded
@extend
def get_info(self):
query_string = urlencode({'query': self.keyword})
self.query_url = 'https://www.uniprot.org/taxonomy/?%s&sort=score' % query_string
response = requests.get(self.query_url, timeout=self.timeout)
bs = BeautifulSoup(response.content, features='html.parser')
try:
self.count = int(re.findall("\d+",bs.find('div',class_='main-aside').find('script').text)[0])
except Exception:
self.count = 0
return self.output(0)
def output(self, error):
if(not error):
return {
'title': 'Taxonomy',
'url': self.query_url,
'count': self.count
}
else:
return {
'title': 'Taxonomy',
'url': self.query_url,
'count': error
}
condition = threading.Condition()
class SpiderMonitor:
delay = 5
timeout_msg = '%d seconds time out.' % delay
name = 'SpiderMonitor'
all_spiders = [iGEMPartsCount, RCSBCount, NLMCount, NCBICount, UniProtCount, TaxonomyCount]
def __init__(self):
self.results = []
self.tasks = [0]
self.thread_pool = []
def __crawl(self):
# each spider should be wrapped by the wrappers 'threaded' and 'extend'
self.tasks[0] = len(self.all_spiders)
for spider in self.all_spiders:
th = spider(self.keyword, self.timeout).get_info(self.results, self.tasks)
self.thread_pool.append(th)
@threaded
def __check_finished(self):
stime = datetime.datetime.now()
while True:
if((datetime.datetime.now() - stime).seconds >= self.timeout):
exit(0)
if(self.tasks[0]==0):
if(condition.acquire()):
condition.notify()
condition.release()
exit(0)
def __not_timeout(self):
if(condition.acquire()):
return condition.wait(timeout=self.delay)
def __run(self):
attempts = 0
self.stime = datetime.datetime.now()
self.timeout = self.attempt_times * self.delay
self.__crawl()
current_output = 0
self.thread_pool.append(self.__check_finished())
while(attempts < self.attempt_times):
attempts = attempts + 1
if(self.__not_timeout() and not self.tasks[0]):
yield self.results[current_output : len(self.results)]
break
else:
print(self.timeout_msg)
next = len(self.results)
yield self.results[current_output : len(self.results)]
current_output = next
# 主要API, 工作模式: 为每个爬虫创建一个线程并行抓取网站的内容, 另外创建一个线程(__check_finished)判断是否所有的爬虫都完成工作. 主进程等待
# check_finished线程发出完成的信号或者timeout, 信号发出或者timeout时主进程发出一次结果, 如果信号已经发出(即工作尽数完成)则直接结束, 否则
# 再一次进入等待过程, 总共最多会重复等待指定的attempt_times次, 然后退出. 爬虫也会在timeout的两倍时间(因为网络请求API的timeout项控制的是
# 连接建立的timeout和传输数据的timeout两部分, 而不是整个过程)内全部退出. 另外当爬虫遇到异常时会主动返回结果, 约定此时结果中的count是-1.
def spiders(self, keyword='', attempt_times=5):
self.keyword=keyword
self.attempt_times = attempt_times
for result in self.__run():
yield result
if(__name__ == '__main__'):
# 调用模式, 调用API的结果是一个iterable
for i in SpiderMonitor().spiders(keyword='p53', attempt_times=3):
print(i)
print('Main stream ended.')