Python中asyncio庫實現(xiàn)異步編程的示例
Python 的 asyncio 庫是標準庫的一部分,引入于 Python 3.4(通過 PEP 3156),用于實現(xiàn)異步編程。它提供了一種基于事件循環(huán)(Event Loop)的并發(fā)機制,適合處理 I/O 密集型任務(wù)(如網(wǎng)絡(luò)請求、文件操作、數(shù)據(jù)庫查詢等)。asyncio 通過協(xié)程(coroutines)、任務(wù)(tasks)和事件循環(huán),允許程序在等待 I/O 操作時執(zhí)行其他任務(wù),從而提高效率。
以下是對 asyncio 庫的詳細說明和常見用法。
1. asyncio 庫的作用
- 異步編程:支持非阻塞的并發(fā)執(zhí)行,適合 I/O 密集型任務(wù)。
- 事件循環(huán):管理協(xié)程和任務(wù)的調(diào)度,協(xié)調(diào)異步操作。
- 高性能:相比線程或進程,協(xié)程的開銷更低,適合高并發(fā)場景。
- 生態(tài)支持:與許多庫(如
aiohttp、aiomysql)集成,支持異步網(wǎng)絡(luò)請求、數(shù)據(jù)庫操作等。
2. 核心概念
- 協(xié)程(Coroutine):使用
async def定義的函數(shù),代表可暫停和恢復的異步操作。 - 事件循環(huán)(Event Loop):
asyncio的核心,負責調(diào)度協(xié)程和處理 I/O 事件。 - 任務(wù)(Task):協(xié)程的封裝,用于在事件循環(huán)中并發(fā)運行。
- Future:表示尚未完成的操作,協(xié)程通常通過 Future 管理結(jié)果。
- Awaitable:可以被
await的對象,包括協(xié)程、任務(wù)和 Future。
3. 基本用法
以下是 asyncio 的基本用法,展示如何定義和運行協(xié)程。
3.1 定義和運行協(xié)程
import asyncio
# 定義協(xié)程
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模擬異步 I/O 操作
print("World")
# 運行協(xié)程
async def main():
await asyncio.gather(say_hello(), say_hello()) # 并發(fā)運行多個協(xié)程
# 執(zhí)行事件循環(huán)
if __name__ == "__main__":
asyncio.run(main())
輸出:
Hello
Hello
World
World
說明:
async def定義協(xié)程,await表示暫停點,允許事件循環(huán)調(diào)度其他任務(wù)。asyncio.sleep(1)模擬異步 I/O(如網(wǎng)絡(luò)請求),不會阻塞事件循環(huán)。asyncio.gather()并發(fā)運行多個協(xié)程。asyncio.run()是運行異步程序的推薦入口,自動創(chuàng)建和關(guān)閉事件循環(huán)。
3.2 事件循環(huán)
事件循環(huán)是 asyncio 的核心,負責調(diào)度協(xié)程和處理回調(diào)。以下是手動操作事件循環(huán)的示例:
import asyncio
async def task():
print("Task started")
await asyncio.sleep(1)
print("Task finished")
loop = asyncio.get_event_loop() # 獲取事件循環(huán)
try:
loop.run_until_complete(task()) # 運行協(xié)程直到完成
finally:
loop.close() # 關(guān)閉事件循環(huán)
說明:
asyncio.get_event_loop()獲取默認事件循環(huán)。loop.run_until_complete()運行單個協(xié)程或 Future。- 通常推薦使用
asyncio.run(),因為它更安全且自動管理循環(huán)的生命周期。
4. 常用功能
asyncio 提供了豐富的 API,以下是常見功能和用法。
4.1 并發(fā)運行多個協(xié)程
使用 asyncio.gather() 或 asyncio.create_task() 實現(xiàn)并發(fā):
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
# 使用 gather 并發(fā)運行
await asyncio.gather(task1(), task2())
# 或者使用 create_task
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
await t1
await t2
asyncio.run(main())
輸出:
Task 1 started
Task 2 started
Task 2 finished
Task 1 finished
說明:
asyncio.gather()等待所有協(xié)程完成,返回結(jié)果列表。asyncio.create_task()將協(xié)程包裝為任務(wù),立即調(diào)度運行。
4.2 超時控制
使用 asyncio.wait_for() 為協(xié)程設(shè)置超時:
import asyncio
async def long_task():
await asyncio.sleep(5)
print("Task completed")
async def main():
try:
await asyncio.wait_for(long_task(), timeout=2) # 2 秒超時
except asyncio.TimeoutError:
print("Task timed out")
asyncio.run(main())
輸出:
Task timed out
4.3 異步迭代
asyncio 支持異步迭代器和異步上下文管理器:
import asyncio
async def async_generator():
for i in range(3):
await asyncio.sleep(1)
yield i
async def main():
async for value in async_generator():
print(f"Received: {value}")
asyncio.run(main())
輸出:
Received: 0
Received: 1
Received: 2
4.4 異步上下文管理器
使用 async with 管理資源:
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def resource():
print("Resource acquired")
try:
yield
finally:
print("Resource released")
async def main():
async with resource():
print("Using resource")
await asyncio.sleep(1)
asyncio.run(main())
輸出:
Resource acquired
Using resource
Resource released
5. 與外部庫集成
asyncio 常與異步庫結(jié)合使用,例如:
aiohttp:異步 HTTP 客戶端/服務(wù)器。
import aiohttp
import asyncio
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch_url("https://example.com")
print(html[:100])
asyncio.run(main())
aiomysql:異步 MySQL 數(shù)據(jù)庫操作。
aiofiles:異步文件讀寫。
6. 高級功能
6.1 任務(wù)取消
可以取消正在運行的任務(wù):
import asyncio
async def long_task():
try:
print("Task started")
await asyncio.sleep(10)
print("Task finished")
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(1)
task.cancel() # 取消任務(wù)
try:
await task
except asyncio.CancelledError:
print("Main caught cancellation")
asyncio.run(main())
輸出:
Task started
Task was cancelled
Main caught cancellation
6.2 同步與異步混合
使用 loop.run_in_executor() 將同步代碼(阻塞操作)運行在線程池或進程池中:
import asyncio
import time
def blocking_task():
time.sleep(1) # 模擬阻塞操作
return "Done"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task) # 在默認線程池運行
print(result)
asyncio.run(main())
輸出:
Done
6.3 自定義事件循環(huán)
可以自定義事件循環(huán)策略,例如使用 uvloop(高性能事件循環(huán)):
import asyncio import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # 后續(xù)代碼使用 uvloop 作為事件循環(huán)
安裝 uvloop:
pip install uvloop
7. 實際應(yīng)用場景
- 網(wǎng)絡(luò)編程:異步爬蟲、Web 服務(wù)器(如 FastAPI、Sanic)。
- 數(shù)據(jù)庫操作:異步查詢數(shù)據(jù)庫(如 aiomysql、asyncpg)。
- 實時應(yīng)用:聊天 服務(wù)器、WebSocket 應(yīng)用。
- 并發(fā)任務(wù):批量處理網(wǎng)絡(luò)請求或文件操作。
- 微服務(wù):異步處理 HTTP 請求或消息隊列。
8. 注意事項
- 單線程模型:
asyncio在單線程中運行,依賴事件循環(huán)調(diào)度,無法利用多核 CPU(需要結(jié)合multiprocessing)。 - 避免阻塞代碼:在異步代碼中調(diào)用同步阻塞函數(shù)(如
time.sleep、requests.get)會阻塞事件循環(huán),需使用異步替代或run_in_executor。 - 協(xié)程必須 await:協(xié)程對象必須通過
await或任務(wù)調(diào)度運行,否則不會執(zhí)行。 - 線程安全:事件循環(huán)不是線程安全的,避免在多線程中共享同一循環(huán)。
- 調(diào)試困難:異步代碼可能因調(diào)度順序?qū)е聫碗s bug,建議使用日志或調(diào)試工具(如
asyncio.run(debug=True))。 - Python 版本:
- Python 3.7+ 引入
asyncio.run()和上下文管理器改進。 - Python 3.8+ 優(yōu)化了調(diào)試支持。
- Python 3.11+ 提供任務(wù)組(
asyncio.TaskGroup)等新功能。
- Python 3.7+ 引入
9. 綜合示例
以下是一個綜合示例,展示異步爬蟲的實現(xiàn):
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://example.com",
"https://python.org",
"https://github.com"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"Failed to fetch {url}: {result}")
else:
print(f"Fetched {url}: {len(result)} bytes")
if __name__ == "__main__":
asyncio.run(main())
輸出示例:
Fetched https://example.com: 1256 bytes
Fetched https://python.org: 50342 bytes
Fetched https://github.com: 123456 bytes
說明:
- 使用
aiohttp.ClientSession管理 HTTP 會話。 asyncio.gather并發(fā)請求多個 URL。return_exceptions=True防止單個失敗影響其他任務(wù)。
到此這篇關(guān)于Python中asyncio庫實現(xiàn)異步編程的示例的文章就介紹到這了,更多相關(guān)Python asyncio異步編程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Python使用asyncio實現(xiàn)異步操作的示例
- Python中asyncio的多種用法舉例(異步同步)
- Python使用asyncio處理異步編程的代碼示例
- Python使用asyncio包實現(xiàn)異步編程方式
- Python異步庫asyncio、aiohttp詳解
- python協(xié)程異步IO中asyncio的使用
- Python使用asyncio標準庫對異步IO的支持
- Python協(xié)程異步爬取數(shù)據(jù)(asyncio+aiohttp)實例
- Python使用asyncio異步時的常見問題總結(jié)
- Python asyncio異步編程常見問題小結(jié)
- Python asyncio異步編程簡單實現(xiàn)示例
相關(guān)文章
使用Python的Bottle框架寫一個簡單的服務(wù)接口的示例
這篇文章主要介紹了使用Python的Bottle框架寫一個簡單的服務(wù)接口的示例,基于Linux系統(tǒng)環(huán)境,需要的朋友可以參考下2015-08-08
python socket網(wǎng)絡(luò)編程步驟詳解(socket套接字使用)
這篇文章主要介紹了什么是套接字、PYTHON套接字模塊,提供一個簡單的python socket編程,大家參考使用2013-12-12
python機器學習Sklearn實戰(zhàn)adaboost算法示例詳解
這篇文章主要為大家介紹了python機器學習Sklearn實戰(zhàn)adaboost算法的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2021-11-11
基于python中staticmethod和classmethod的區(qū)別(詳解)
下面小編就為大家?guī)硪黄趐ython中staticmethod和classmethod的區(qū)別(詳解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-10-10

