亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

詳解python中asyncio模塊

 更新時(shí)間:2018年03月03日 10:15:29   作者:scrounger  
本篇文章給大家詳細(xì)分析了python中重要的asyncio模塊相關(guān)知識(shí)點(diǎn),有興趣的朋友可以學(xué)習(xí)參考下。

一直對(duì)asyncio這個(gè)庫比較感興趣,畢竟這是官網(wǎng)也非常推薦的一個(gè)實(shí)現(xiàn)高并發(fā)的一個(gè)模塊,python也是在python 3.4中引入了協(xié)程的概念。也通過這次整理更加深刻理解這個(gè)模塊的使用

asyncio 是干什么的?

異步網(wǎng)絡(luò)操作并發(fā)協(xié)程

python3.0時(shí)代,標(biāo)準(zhǔn)庫里的異步網(wǎng)絡(luò)模塊:select(非常底層) python3.0時(shí)代,第三方異步網(wǎng)絡(luò)庫:Tornado python3.4時(shí)代,asyncio:支持TCP,子進(jìn)程

現(xiàn)在的asyncio,有了很多的模塊已經(jīng)在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這里列出了已經(jīng)支持的內(nèi)容,并在持續(xù)更新

當(dāng)然到目前為止實(shí)現(xiàn)協(xié)程的不僅僅只有asyncio,tornado和gevent都實(shí)現(xiàn)了類似功能

關(guān)于asyncio的一些關(guān)鍵字的說明:

event_loop 事件循環(huán):程序開啟一個(gè)無限循環(huán),把一些函數(shù)注冊到事件循環(huán)上,當(dāng)滿足事件發(fā)生的時(shí)候,調(diào)用相應(yīng)的協(xié)程函數(shù)

coroutine 協(xié)程:協(xié)程對(duì)象,指一個(gè)使用async關(guān)鍵字定義的函數(shù),它的調(diào)用不會(huì)立即執(zhí)行函數(shù),而是會(huì)返回一個(gè)協(xié)程對(duì)象。協(xié)程對(duì)象需要注冊到事件循環(huán),由事件循環(huán)調(diào)用。

task 任務(wù):一個(gè)協(xié)程對(duì)象就是一個(gè)原生可以掛起的函數(shù),任務(wù)則是對(duì)協(xié)程進(jìn)一步封裝,其中包含了任務(wù)的各種狀態(tài)

future: 代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果。它和task上沒有本質(zhì)上的區(qū)別

async/await 關(guān)鍵字:python3.5用于定義協(xié)程的關(guān)鍵字,async定義一個(gè)協(xié)程,await用于掛起阻塞的異步調(diào)用接口。

看了上面這些關(guān)鍵字,你可能扭頭就走了,其實(shí)一開始了解和研究asyncio這個(gè)模塊有種抵觸,自己也不知道為啥,這也導(dǎo)致很長一段時(shí)間,這個(gè)模塊自己也基本就沒有關(guān)注和使用,但是隨著工作上用python遇到各種性能問題的時(shí)候,自己告訴自己還是要好好學(xué)習(xí)學(xué)習(xí)這個(gè)模塊。

定義一個(gè)協(xié)程

import time
import asyncio


now = lambda : time.time()


async def do_some_work(x):
  print("waiting:", x)

start = now()
# 這里是一個(gè)協(xié)程對(duì)象,這個(gè)時(shí)候do_some_work函數(shù)并沒有執(zhí)行
coroutine = do_some_work(2)
print(coroutine)
# 創(chuàng)建一個(gè)事件loop
loop = asyncio.get_event_loop()
# 將協(xié)程加入到事件循環(huán)loop
loop.run_until_complete(coroutine)

print("Time:",now()-start)

在上面帶中我們通過async關(guān)鍵字定義一個(gè)協(xié)程(coroutine),當(dāng)然協(xié)程不能直接運(yùn)行,需要將協(xié)程加入到事件循環(huán)loop中

asyncio.get_event_loop:創(chuàng)建一個(gè)事件循環(huán),然后使用run_until_complete將協(xié)程注冊到事件循環(huán),并啟動(dòng)事件循環(huán)

創(chuàng)建一個(gè)task

協(xié)程對(duì)象不能直接運(yùn)行,在注冊事件循環(huán)的時(shí)候,其實(shí)是run_until_complete方法將協(xié)程包裝成為了一個(gè)任務(wù)(task)對(duì)象. task對(duì)象是Future類的子類,保存了協(xié)程運(yùn)行后的狀態(tài),用于未來獲取協(xié)程的結(jié)果

import asyncio
import time


now = lambda: time.time()


async def do_some_work(x):
  print("waiting:", x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("Time:",now()-start)

結(jié)果為:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
waiting: 2
<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
Time: 0.0003514289855957031

創(chuàng)建task后,在task加入事件循環(huán)之前為pending狀態(tài),當(dāng)完成后,狀態(tài)為finished

關(guān)于上面通過loop.create_task(coroutine)創(chuàng)建task,同樣的可以通過 asyncio.ensure_future(coroutine)創(chuàng)建task

關(guān)于這兩個(gè)命令的官網(wǎng)解釋: https://docs.python.org/3/library/asyncio-task.html

asyncio.ensure_future(coro_or_future, *, loop=None)¶
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

https://docs.python.org/3/library/asyncio-eventloop.html

AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

綁定回調(diào)

綁定回調(diào),在task執(zhí)行完成的時(shí)候可以獲取執(zhí)行的結(jié)果,回調(diào)的最后一個(gè)參數(shù)是future對(duì)象,通過該對(duì)象可以獲取協(xié)程返回值。

import time
import asyncio


now = lambda : time.time()


async def do_some_work(x):
  print("waiting:",x)
  return "Done after {}s".format(x)


def callback(future):
  print("callback:",future.result())


start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)

print("Time:", now()-start)

結(jié)果為:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
Time: 0.00039196014404296875

通過add_done_callback方法給task任務(wù)添加回調(diào)函數(shù),當(dāng)task(也可以說是coroutine)執(zhí)行完成的時(shí)候,就會(huì)調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。這里我們創(chuàng)建 的task和回調(diào)里的future對(duì)象實(shí)際上是同一個(gè)對(duì)象

阻塞和await

使用async可以定義協(xié)程對(duì)象,使用await可以針對(duì)耗時(shí)的操作進(jìn)行掛起,就像生成器里的yield一樣,函數(shù)讓出控制權(quán)。協(xié)程遇到await,事件循環(huán)將會(huì)掛起該協(xié)程,執(zhí)行別的協(xié)程,直到其他的協(xié)程也掛起或者執(zhí)行完畢,再進(jìn)行下一個(gè)協(xié)程的執(zhí)行

耗時(shí)的操作一般是一些IO操作,例如網(wǎng)絡(luò)請求,文件讀取等。我們使用asyncio.sleep函數(shù)來模擬IO操作。協(xié)程的目的也是讓這些IO操作異步化。

import asyncio
import time



now = lambda :time.time()

async def do_some_work(x):
  print("waiting:",x)
  # await 后面就是調(diào)用耗時(shí)的操作
  await asyncio.sleep(x)
  return "Done after {}s".format(x)


start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print("Task ret:", task.result())
print("Time:", now() - start)

在await asyncio.sleep(x),因?yàn)檫@里sleep了,模擬了阻塞或者耗時(shí)操作,這個(gè)時(shí)候就會(huì)讓出控制權(quán)。 即當(dāng)遇到阻塞調(diào)用的函數(shù)的時(shí)候,使用await方法將協(xié)程的控制權(quán)讓出,以便loop調(diào)用其他的協(xié)程。

并發(fā)和并行

并發(fā)指的是同時(shí)具有多個(gè)活動(dòng)的系統(tǒng)

并行值得是用并發(fā)來使一個(gè)系統(tǒng)運(yùn)行的更快。并行可以在操作系統(tǒng)的多個(gè)抽象層次進(jìn)行運(yùn)用

所以并發(fā)通常是指有多個(gè)任務(wù)需要同時(shí)進(jìn)行,并行則是同一個(gè)時(shí)刻有多個(gè)任務(wù)執(zhí)行

下面這個(gè)例子非常形象:

并發(fā)情況下是一個(gè)老師在同一時(shí)間段輔助不同的人功課。并行則是好幾個(gè)老師分別同時(shí)輔助多個(gè)學(xué)生功課。簡而言之就是一個(gè)人同時(shí)吃三個(gè)饅頭還是三個(gè)人同時(shí)分別吃一個(gè)的情況,吃一個(gè)饅頭算一個(gè)任務(wù)

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
  print("Waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
  asyncio.ensure_future(coroutine1),
  asyncio.ensure_future(coroutine2),
  asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
  print("Task ret:",task.result())

print("Time:",now()-start)

運(yùn)行結(jié)果:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004154920578003

總時(shí)間為4s左右。4s的阻塞時(shí)間,足夠前面兩個(gè)協(xié)程執(zhí)行完畢。如果是同步順序的任務(wù),那么至少需要7s。此時(shí)我們使用了aysncio實(shí)現(xiàn)了并發(fā)。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個(gè)task列表,后者接收一堆task。

關(guān)于asyncio.gather和asyncio.wait官網(wǎng)的說明:

https://docs.python.org/3/library/asyncio-task.html

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future's result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

https://docs.python.org/3/library/asyncio-task.html

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.

協(xié)程嵌套

使用async可以定義協(xié)程,協(xié)程用于耗時(shí)的io操作,我們也可以封裝更多的io操作過程,這樣就實(shí)現(xiàn)了嵌套的協(xié)程,即一個(gè)協(xié)程中await了另外一個(gè)協(xié)程,如此連接起來。

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]

  dones, pendings = await asyncio.wait(tasks)
  for task in dones:
    print("Task ret:", task.result())

  # results = await asyncio.gather(*tasks)
  # for result in results:
  #   print("Task ret:",result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

如果我們把上面代碼中的:

  dones, pendings = await asyncio.wait(tasks)
  for task in dones:
    print("Task ret:", task.result())

替換為:

  results = await asyncio.gather(*tasks)
  for result in results:
    print("Task ret:",result)

這樣得到的就是一個(gè)結(jié)果的列表

不在main協(xié)程函數(shù)里處理結(jié)果,直接返回await的內(nèi)容,那么最外層的run_until_complete將會(huì)返回main協(xié)程的結(jié)果。 將上述的代碼更改為:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]
  return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
  print("Task ret:",result)

print("Time:", now()-start)

或者返回使用asyncio.wait方式掛起協(xié)程。

將代碼更改為:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]
  return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done,pending = loop.run_until_complete(main())
for task in done:
  print("Task ret:",task.result())

print("Time:", now()-start)

也可以使用asyncio的as_completed方法

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]
  for task in asyncio.as_completed(tasks):
    result = await task
    print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

從上面也可以看出,協(xié)程的調(diào)用和組合非常靈活,主要體現(xiàn)在對(duì)于結(jié)果的處理:如何返回,如何掛起

協(xié)程的停止

future對(duì)象有幾個(gè)狀態(tài):

Pending Running Done Cacelled

創(chuàng)建future的時(shí)候,task為pending,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running,調(diào)用完畢自然就是done,如果需要停止事件循環(huán),就需要先把task取消??梢允褂胊syncio.Task獲取事件循環(huán)的task

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
  print("Waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)

tasks = [
  asyncio.ensure_future(coroutine1),
  asyncio.ensure_future(coroutine2),
  asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
  loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
  print(asyncio.Task.all_tasks())
  for task in asyncio.Task.all_tasks():
    print(task.cancel())
  loop.stop()
  loop.run_forever()
finally:
  loop.close()

print("Time:",now()-start)

啟動(dòng)事件循環(huán)之后,馬上ctrl+c,會(huì)觸發(fā)run_until_complete的執(zhí)行異常 KeyBorardInterrupt。然后通過循環(huán)asyncio.Task取消future??梢钥吹捷敵鋈缦拢?/p>

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True表示cannel成功,loop stop之后還需要再次開啟事件循環(huán),最后在close,不然還會(huì)拋出異常

循環(huán)task,逐個(gè)cancel是一種方案,可是正如上面我們把task的列表封裝在main函數(shù)中,main函數(shù)外進(jìn)行事件循環(huán)的調(diào)用。這個(gè)時(shí)候,main相當(dāng)于最外出的一個(gè)task,那么處理包裝的main函數(shù)即可。

不同線程的事件循環(huán)

很多時(shí)候,我們的事件循環(huán)用于注冊協(xié)程,而有的協(xié)程需要?jiǎng)討B(tài)的添加到事件循環(huán)中。一個(gè)簡單的方式就是使用多線程。當(dāng)前線程創(chuàng)建一個(gè)事件循環(huán),然后在新建一個(gè)線程,在新線程中啟動(dòng)事件循環(huán)。當(dāng)前線程不會(huì)被block。

import asyncio
from threading import Thread
import time

now = lambda :time.time()

def start_loop(loop):
  asyncio.set_event_loop(loop)
  loop.run_forever()

def more_work(x):
  print('More work {}'.format(x))
  time.sleep(x)
  print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

啟動(dòng)上述代碼之后,當(dāng)前線程不會(huì)被block,新線程中會(huì)按照順序執(zhí)行call_soon_threadsafe方法注冊的more_work方法, 后者因?yàn)閠ime.sleep操作是同步阻塞的,因此運(yùn)行完畢more_work需要大致6 + 3

新線程協(xié)程

import asyncio
import time
from threading import Thread

now = lambda :time.time()


def start_loop(loop):
  asyncio.set_event_loop(loop)
  loop.run_forever()

async def do_some_work(x):
  print('Waiting {}'.format(x))
  await asyncio.sleep(x)
  print('Done after {}s'.format(x))

def more_work(x):
  print('More work {}'.format(x))
  time.sleep(x)
  print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中創(chuàng)建一個(gè)new_loop,然后在另外的子線程中開啟一個(gè)無限事件循環(huán)。 主線程通過run_coroutine_threadsafe新注冊協(xié)程對(duì)象。這樣就能在子線程中進(jìn)行事件循環(huán)的并發(fā)操作,同時(shí)主線程又不會(huì)被block。一共執(zhí)行的時(shí)間大概在6s左右。

相關(guān)文章

最新評(píng)論