python?基于aiohttp的異步爬蟲實戰(zhàn)詳解
引言
鋼鐵知識庫,一個學習python爬蟲、數(shù)據(jù)分析的知識庫。人生苦短,快用python。
之前我們使用requests庫爬取某個站點的時候,每發(fā)出一個請求,程序必須等待網(wǎng)站返回響應才能接著運行,而在整個爬蟲過程中,整個爬蟲程序是一直在等待的,實際上沒有做任何事情。
像這種占用磁盤/內(nèi)存IO、網(wǎng)絡IO的任務,大部分時間是CPU在等待的操作,就叫IO密集型任務。對于這種情況有沒有優(yōu)化方案呢,當然有,那就是使用aiohttp庫實現(xiàn)異步爬蟲。
aiohttp是什么
我們在使用requests請求時,只能等一個請求先出去再回來,才會發(fā)送下一個請求。明顯效率不高阿,這時候如果換成異步請求的方式,就不會有這個等待。一個請求發(fā)出去,不管這個請求什么時間響應,程序通過await掛起協(xié)程對象后直接進行下一個請求。
解決方法就是通過 aiohttp + asyncio,什么是aiohttp?一個基于 asyncio 的異步 HTTP 網(wǎng)絡模塊,可用于實現(xiàn)異步爬蟲,速度明顯快于 requests 的同步爬蟲。
requests和aiohttp區(qū)別
區(qū)別就是一個同步一個是異步。話不多說直接上代碼看效果。
安裝aiohttp
pip install aiohttp
- requests同步示例:
#!/usr/bin/env python # -*- coding: utf-8 -*- # author: 鋼鐵知識庫 import time import requests # 同步請求 def main(): start = time.time() for i in range(5): res = requests.get('http://httpbin.org/delay/2') print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status_code}') print(f'requests同步耗時:{time.time() - start}') if __name__ == '__main__': main() ''' 當前時間:2022-09-05 15:44:51.991685, status_code = 200 當前時間:2022-09-05 15:44:54.528918, status_code = 200 當前時間:2022-09-05 15:44:57.057373, status_code = 200 當前時間:2022-09-05 15:44:59.643119, status_code = 200 當前時間:2022-09-05 15:45:02.167362, status_code = 200 requests同步耗時:12.785893440246582 '''
可以看到5次請求總共用12.7秒,再來看同樣的請求異步多少時間。
- aiohttp異步示例:
#!/usr/bin/env python # file: day6-9同步和異步.py # author: 鋼鐵知識庫 import asyncio import time import aiohttp async def async_http(): # 聲明一個支持異步的上下文管理器 async with aiohttp.ClientSession() as session: res = await session.get('http://httpbin.org/delay/2') print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status}') tasks = [async_http() for _ in range(5)] start = time.time() # Python 3.7 及以后,不需要顯式聲明事件循環(huán),可以使用 asyncio.run()來代替最后的啟動操作 asyncio.run(asyncio.wait(tasks)) print(f'aiohttp異步耗時:{time.time() - start}') ''' 當前時間:2022-09-05 15:42:32.363966, status_code = 200 當前時間:2022-09-05 15:42:32.366957, status_code = 200 當前時間:2022-09-05 15:42:32.374973, status_code = 200 當前時間:2022-09-05 15:42:32.384909, status_code = 200 當前時間:2022-09-05 15:42:32.390318, status_code = 200 aiohttp異步耗時:2.5826876163482666 '''
兩次對比可以看到執(zhí)行過程,時間一個是順序執(zhí)行,一個是同時執(zhí)行。這就是同步和異步的區(qū)別。
aiohttp使用介紹
接下來我們會詳細介紹aiohttp庫的用法和爬取實戰(zhàn)。aiohttp 是一個支持異步請求的庫,它和 asyncio 配合使用,可以使我們非常方便地實現(xiàn)異步請求操作。asyncio模塊,其內(nèi)部實現(xiàn)了對TCP、UDP、SSL協(xié)議的異步操作,但是對于HTTP請求,就需要aiohttp實現(xiàn)了。
aiohttp分為兩部分,一部分是Client,一部分是Server。下面來說說aiohttp客戶端部分的用法。
基本實例
先寫一個簡單的案例
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 import asyncio import aiohttp async def get_api(session, url): # 聲明一個支持異步的上下文管理器 async with session.get(url) as response: return await response.text(), response.status async def main(): async with aiohttp.ClientSession() as session: html, status = await get_api(session, 'http://httpbin.org/delay/2') print(f'html: {html[:50]}') print(f'status : {status}') if __name__ == '__main__': # Python 3.7 及以后,不需要顯式聲明事件循環(huán),可以使用 asyncio.run(main())來代替最后的啟動操作 asyncio.get_event_loop().run_until_complete(main()) ''' html: { "args": {}, "data": "", "files": {}, status : 200 Process finished with exit code 0 '''
aiohttp請求的方法和之前有明顯區(qū)別,主要包括如下幾點:
- 除了導入aiohttp庫,還必須引入asyncio庫,因為要實現(xiàn)異步,需要啟動協(xié)程。
- 異步的方法定義不同,前面都要統(tǒng)一加async來修飾。
- with as用于聲明上下文管理器,幫我們自動分配和釋放資源,加上async代碼支持異步。
- 對于返回協(xié)程對象的操作,前面需要加await來修飾。response.text()返回的是協(xié)程對象。
- 最后運行啟用循環(huán)事件
注意:Python3.7及以后的版本中,可以使用asyncio.run(main())代替最后的啟動操作。
URL參數(shù)設置
對于URL參數(shù)的設置,我們可以借助params設置,傳入一個字典即可,實例如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 import aiohttp import asyncio async def main(): params = {'name': '鋼鐵知識庫', 'age': 23} async with aiohttp.ClientSession() as session: async with session.get('https://www.httpbin.org/get', params=params) as res: print(await res.json()) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ''' {'args': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=鋼鐵知識庫&age=23'} '''
可以看到實際請求的URL后面帶了后綴,這就是params的內(nèi)容。
請求類型
除了get請求,aiohttp還支持其它請求類型,如POST、PUT、DELETE等,和requests使用方式類似。
session.post('http://httpbin.org/post', data=b'data') session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')
要使用這些方法,只需要把對應的方法和參數(shù)替換一下。用法和get類似就不再舉例。
響應的幾個方法
對于響應來說,我們可以用如下方法分別獲取其中的響應情況。狀態(tài)碼、響應頭、響應體、響應體二進制內(nèi)容、響應體JSON結(jié)果,實例如下:
#!/usr/bin/env python # @Author : 鋼鐵知識庫 import aiohttp import asyncio async def main(): data = {'name': '鋼鐵知識庫', 'age': 23} async with aiohttp.ClientSession() as session: async with session.post('https://www.httpbin.org/post', data=data) as response: print('status:', response.status) # 狀態(tài)碼 print('headers:', response.headers) # 響應頭 print('body:', await response.text()) # 響應體 print('bytes:', await response.read()) # 響應體二進制內(nèi)容 print('json:', await response.json()) # 響應體json數(shù)據(jù) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
''' status: 200 headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> body: { "args": {}, "data": "", "files": {}, "form": { "age": "23", "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93" }, "headers": { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Content-Length": "57", "Content-Type": "application/x-www-form-urlencoded", "Host": "www.httpbin.org", "User-Agent": "Python/3.8 aiohttp/3.8.1", "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1" }, "json": null, "origin": "122.55.11.188", "url": "https://www.httpbin.org/post" } bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "23", \n "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "57", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.8 aiohttp/3.8.1", \n "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n }, \n "json": null, \n "origin": "122.5.132.196", \n "url": "https://www.httpbin.org/post"\n}\n' json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'} '''
可以看到有些字段前面需要加await,因為其返回的是一個協(xié)程對象(如async修飾的方法),那么前面就要加await。
超時設置
我們可以借助ClientTimeout
對象設置超時,例如要設置1秒的超時時間,可以這么實現(xiàn):
#!/usr/bin/env python # @Author : 鋼鐵知識庫 import aiohttp import asyncio async def main(): # 設置 1 秒的超時 timeout = aiohttp.ClientTimeout(total=1) data = {'name': '鋼鐵知識庫', 'age': 23} async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get('https://www.httpbin.org/delay/2', data=data) as response: print('status:', response.status) # 狀態(tài)碼 if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ''' Traceback (most recent call last): ####中間省略#### raise asyncio.TimeoutError from None asyncio.exceptions.TimeoutError '''
這里設置了超時1秒請求延時2秒,發(fā)現(xiàn)拋出異常asyncio.TimeoutError
,如果正常則響應200。
并發(fā)限制
aiohttp可以支持非常高的并發(fā)量,但面對高并發(fā)網(wǎng)站可能會承受不住,隨時有掛掉的危險,這時需要對并發(fā)進行一些控制?,F(xiàn)在我們借助asyncio 的Semaphore來控制并發(fā)量,實例如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 import asyncio from datetime import datetime import aiohttp # 聲明最大并發(fā)量 semaphore = asyncio.Semaphore(2) async def get_api(): async with semaphore: print(f'scrapting...{datetime.now()}') async with session.get('https://www.baidu.com') as response: await asyncio.sleep(2) # print(f'當前時間:{datetime.now()}, {response.status}') async def main(): global session session = aiohttp.ClientSession() tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)] await asyncio.gather(*tasks) await session.close() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ''' scrapting...2022-09-07 08:11:14.190000 scrapting...2022-09-07 08:11:14.292000 scrapting...2022-09-07 08:11:16.482000 scrapting...2022-09-07 08:11:16.504000 scrapting...2022-09-07 08:11:18.520000 scrapting...2022-09-07 08:11:18.521000 '''
在main方法里,我們聲明了1000個task,如果沒有通過Semaphore進行并發(fā)限制,那這1000放到gather方法后會被同時執(zhí)行,并發(fā)量相當大。有了信號量的控制之后,同時運行的task數(shù)量就會被控制,這樣就能給aiohttp限制速度了。
aiohttp異步爬取實戰(zhàn)
接下來我們通過異步方式練手一個小說爬蟲,需求如下:
需求頁面:https://dushu.baidu.com/pc/detail?gid=4308080950
目錄接口:https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}
詳情接口:
https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}
關鍵參數(shù):book_id
:小說ID、cid
:章節(jié)id
采集要求:使用協(xié)程方式寫入,數(shù)據(jù)存放進mongo
需求分析:點開需求頁面,通過F12抓包可以發(fā)現(xiàn)兩個接口。一個目錄接口,一個詳情接口。
首先第一步先請求目錄接口拿到cid章節(jié)id,然后將cid傳遞給詳情接口拿到小說數(shù)據(jù),最后存入mongo即可。
話不多說,直接上代碼:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 # 不合適就是不合適,真正合適的,你不會有半點猶豫。 import asyncio import json,re import logging import aiohttp import requests from utils.conn_db import ConnDb # 日志格式 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') # 章節(jié)目錄api b_id = '4308080950' url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/104.0.0.0 Safari/537.36" } # 并發(fā)聲明 semaphore = asyncio.Semaphore(5) async def download(title,b_id, cid): data = { "book_id": b_id, "cid": f'{b_id}|{cid}', } data = json.dumps(data) detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data) async with semaphore: async with aiohttp.ClientSession(headers=headers) as session: async with session.get(detail_url) as response: res = await response.json() content = { 'title': title, 'content': res['data']['novel']['content'] } # print(title) await save_data(content) async def save_data(data): if data: client = ConnDb().conn_motor_mongo() db = client.baidu_novel collection = db.novel logging.info('saving data %s', data) await collection.update_one( {'title': data.get('title')}, {'$set': data}, upsert=True ) async def main(): res = requests.get(url, headers=headers) tasks = [] for re in res.json()['data']['novel']['items']: # 拿到某小說目錄cid title = re['title'] cid = re['cid'] tasks.append(download(title, b_id, cid)) # 將請求放到列表里,再通過gather執(zhí)行并發(fā) await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main())
至此,我們就使用aiohttp完成了對小說章節(jié)的爬取。
要實現(xiàn)異步處理,得先要有掛起操作,當一個任務需要等待 IO 結(jié)果的時候,可以掛起當前任務,轉(zhuǎn)而去執(zhí)行其他任務,這樣才能充分利用好資源,要實現(xiàn)異步,需要了解 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權(quán)。當協(xié)程執(zhí)行的時候遇到 await,時間循環(huán)就會將本協(xié)程掛起,轉(zhuǎn)而去執(zhí)行別的協(xié)程,直到其他的協(xié)程掛起或執(zhí)行完畢。
await 后面的對象必須是如下格式之一:
- A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
- A generator-based coroutine object returned from a function decorated with types.coroutine,一個由 types.coroutine 修飾的生成器,這個生成器可以返回 coroutine 對象。
- An object with an await method returning an iterator,一個包含 await 方法的對象返回的一個迭代器。
總結(jié)
以上就是借助協(xié)程async和異步aiohttp兩個主要模塊完成異步爬蟲的內(nèi)容,
aiohttp 以異步方式爬取網(wǎng)站的耗時遠小于 requests 同步方式,以上列舉的例子希望對你有幫助。
注意,線程和協(xié)程是兩個概念,后面找機會我們再聊聊進程和線程、線程和協(xié)程的關系
更多關于python aiohttp異步爬蟲的資料請關注腳本之家其它相關文章!
相關文章
關于jieba.cut與jieba.lcut的區(qū)別及說明
這篇文章主要介紹了關于jieba.cut與jieba.lcut的區(qū)別及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05