百行代码的异步爬虫 - 基于 Python 实现
一个优雅的爬虫需要一下这些东西:
- 请求器
- 页面解析器
- 链接生成器
- 调度器
请求器
负责发送请求。
页面解析器
负责从页面上解析出继续爬的链接。
链接生成器
负责处理继续爬虫的链接并放入队列。
调度器
决定链接是否应该被爬去的核心部件。
异步
同时有多个请求在发送,即时异步爬虫。
代码
相关代码已上传到 Github[https://github.com/EINDEX/100-line-async-spider]。
import aiohttp, asyncio, aiofiles, lxml, pathlib, sys, re, lxml.html
from urllib.parse import urlparse
from pathlib import Path
from hashlib import md5
MAX_GET, MAX_QUEUE_SIZE, MAX_WORKER = 10000, 100, 20
spider_url_set, spider_content_set = set(), set()
status = {"success": 0, "all": 0, "same": 0, "same_content":0}
async def fetch(url):
global MAX_GET
if MAX_GET < 0:
return
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), timeout=aiohttp.ClientTimeout(total=1)) as session:
async with session.get(url) as response:
return await response.text()
async def savefile(path, data):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(path, mode='w') as f:
await f.write(data)
def data_analysis(data):
try:
html = lxml.html.fromstring(data)
return html.xpath('//a/@href')
except:
return []
def url_analysis(endpoint, urls):
for url in urls:
if not url or url == '#' or 'javascript' in url:
continue
elif url.startswith('/'):
if endpoint.endswith('/'):
endpoint = endpoint[:-1]
yield endpoint + url
elif url.endswith('.html') or url.endswith('.htm') or url.endswith('.shtml') or url.endswith('/') or '?' in url:
yield url
def path_gene(endpoint):
result = urlparse(endpoint)
return f'data/{"/".join(list(filter(lambda x:x, result.hostname.split(".")))[::-1])}{result.path+"/index.html" if result.path!="/" else "/index.html"}{result.query.replace("/","")}'
async def pushurl(url_iter, queue):
global MAX_GET
try:
for url in url_iter:
if MAX_GET > 0:
await queue.put(url)
except Exception as e:
pass
async def spider(name,queue):
global MAX_GET, spider_content_set, spider_url_set
while MAX_GET > 0:
try:
endpoint = await queue.get()
data = await fetch(endpoint)
md5data = md5(data.encode()).hexdigest()
if md5data in spider_content_set:
status['same_content'] += 1
continue
MAX_GET -= 1
status['all'] += 1
spider_content_set.add(md5data)
await savefile(path_gene(endpoint), data)
asyncio.gather(asyncio.create_task(pushurl(url_analysis(endpoint, data_analysis(data)), queue)), return_exceptions=False)
status['success'] += 1
except Exception as e:
pass
def exit():
print('exit')
for task in asyncio.Task.all_tasks():
task.cancel()
sys.exit(0)
async def main(first_endpoint):
queue = asyncio.Queue(MAX_QUEUE_SIZE)
await queue.put(first_endpoint)
asyncio.gather(*[asyncio.create_task(spider(spider_id,queue)) for spider_id in range(MAX_WORKER)], return_exceptions=False)
try:
while True:
await asyncio.sleep(1)
print(queue.qsize(), MAX_GET, status)
if MAX_GET <= 0 or not queue.qsize():
exit()
except:
exit()
if __name__ == "__main__":
asyncio.run(main('https://eindex.me/'))
asyncio.get_event_loop().close()