亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

簡(jiǎn)單理解Python中的事件循環(huán)EventLoop

 更新時(shí)間:2023年10月09日 08:11:30   作者:songcser  
在 python 3中,加入了 asyncio 模塊,來(lái)實(shí)現(xiàn)協(xié)程,其中一個(gè)很重要的概念是事件循環(huán),本文我們就來(lái)自己實(shí)現(xiàn)一個(gè)相對(duì)簡(jiǎn)單的EventLoop,從而了解一下事件循環(huán)是如何進(jìn)行運(yùn)轉(zhuǎn)的吧

簡(jiǎn)介

在 python 3中,加入了 asyncio 模塊,來(lái)實(shí)現(xiàn)協(xié)程,其中一個(gè)很重要的概念是事件循環(huán),整個(gè)異步流程都是事件循環(huán)推動(dòng)的。下面自己實(shí)現(xiàn)一個(gè)相對(duì)簡(jiǎn)單的EventLoop,了解一下事件循環(huán)是如何進(jìn)行運(yùn)轉(zhuǎn)的。

事件循環(huán)

下面看一下整個(gè)流程的實(shí)現(xiàn)過(guò)程

將以下代碼寫(xiě)入 spider_event_loop.py 文件:

# spider_event_loop.py
import time
import os
import socket
from urllib.parse import urlparse
from collections import deque
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
# selector = DefaultSelector()
urls = ['https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image',
        'https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image'
]
# 事件循環(huán)實(shí)現(xiàn)
class EventLoop:
    def __init__(self):
     	# 整個(gè)流程是否
        self._stopped = False
        self._ready = deque()
        self._selector = DefaultSelector()
    # 向 _ready 隊(duì)列中加入回調(diào)方法
    def call_soon(self, callback, *args):
        # 將事件添加到隊(duì)列里
        handle = Handle(callback, self, *args)
        self._ready.append(handle)
    # 套接字注冊(cè)讀事件
    def add_writer(self, fd, callback, *args):
        # 將回調(diào)方法封裝成Handle
        handle = Handle(callback, self, *args)
        try:
            # 檢查有沒(méi)有注冊(cè)過(guò)
            key = self._selector.get_key(fd)
        except KeyError:
            # 沒(méi)有注冊(cè)過(guò),進(jìn)行寫(xiě)事件注冊(cè)
            self._selector.register(fd, EVENT_WRITE, handle)
        else:
            # 注冊(cè)過(guò),添加寫(xiě)事件
            mask = key.events
            self._selector.modify(fd, mask | EVENT_WRITE, handle)
    # 套接字注銷(xiāo)讀事件
    def remove_writer(self, fd):
        try:
            # 檢查有沒(méi)有注冊(cè)過(guò)
            self._selector.get_key(fd)
        except KeyError:
            # 沒(méi)有,直接返回
            return
        # 注銷(xiāo)事件
        self._selector.unregister(fd)
    # 套接字注冊(cè)寫(xiě)事件
    def add_reader(self, fd, callback, *args):
        # 將回調(diào)方法,封裝成Handle
        handle = Handle(callback, self, *args)
        try:
            # 檢查是否注冊(cè)過(guò)
            key = self._selector.get_key(fd)
        except KeyError:
            # 沒(méi)有注冊(cè)過(guò),進(jìn)行讀事件注冊(cè)
            self._selector.register(fd, EVENT_READ, handle)
        else:
            # 注冊(cè)過(guò),添加讀事件
            mask = key.events
            self._selector.modify(fd, mask | EVENT_READ, handle)
    # 套接字注銷(xiāo)寫(xiě)事件
    def remove_reader(self, fd):
        try:
            # 檢查有沒(méi)有注冊(cè)過(guò)
            self._selector.get_key(fd)
        except KeyError:
            # 沒(méi)有,直接返回
            return
        # 注銷(xiāo)事件
        self._selector.unregister(fd)
    # 處理事件
    def _process_events(self, event_list):
        for key, mask in event_list:
            fileobj, handle = key.fileobj, key.data
            if mask & EVENT_READ:
                self.remove_reader(fileobj)
            if mask & EVENT_WRITE:
                self.remove_writer(fileobj)
            # 注冊(cè)的事件發(fā)生,將注冊(cè)時(shí)的handle放入隊(duì)列中
            self._ready.append(handle)
    # 運(yùn)行事件隊(duì)列,執(zhí)行回調(diào)方法
    def run_once(self):
        # 獲取發(fā)生事件的所有文件描述符
        event_list = self._selector.select()
        self._process_events(event_list)
        while self._ready:
            # 將隊(duì)列中的handle取出,執(zhí)行回調(diào)方法
            handle = self._ready.popleft()
            handle.run()
    # 無(wú)限循環(huán),直到stopped
    def run_forever(self):
        while True:
            # 執(zhí)行事件
            self.run_once()
            if self._stopped:
                break
    # 執(zhí)行協(xié)程方法,直到事件循環(huán)執(zhí)行結(jié)束
    def run_until_complete(self, future):
        # 在 future 中添加回調(diào)方法,在 future set_value 的時(shí)候執(zhí)行這個(gè)回調(diào)方法
        future.add_done_callback(self._run_until_complete_cb)
        # 開(kāi)始無(wú)限循環(huán)
        self.run_forever()
        # 返回 future 的值
        return future.value
    # 結(jié)束時(shí)的回調(diào)方法
    def _run_until_complete_cb(self, future):
        # 結(jié)束無(wú)限循環(huán)
        self.close()
    # 關(guān)閉方法
    def close(self):
        self._stopped = True
    # 創(chuàng)建 Future 實(shí)例
    def create_future(self):
        return Future(loop=self)
    # 創(chuàng)建 Task 實(shí)例
    def create_task(self, coro):
        return Task(coro, loop=self)

以上類(lèi)是一個(gè)簡(jiǎn)單是事件循環(huán),事件循環(huán)基本的操作都已經(jīng)包含,注冊(cè)注銷(xiāo)事件,無(wú)限循環(huán)獲取文件描述符的事件,執(zhí)行Handle隊(duì)列,這里是使用的 _ready 隊(duì)列,表示已經(jīng)準(zhǔn)備好的 Handle,由于沒(méi)有延時(shí)事件所以省略了調(diào)度隊(duì)列

# Handle 類(lèi),對(duì)回調(diào)方法做封裝
class Handle:
    def __init__(self, callback, loop, *args):
        self._callback = callback
        self._args = args
    # 執(zhí)行回調(diào)方法
    def run(self):
        self._callback(*self._args)

Handle 類(lèi)對(duì)回調(diào)方法進(jìn)行封裝

# Future 類(lèi)
class Future:
    def __init__(self, loop=None):
        self.value = None
        # 將要執(zhí)行的回調(diào)方法
        self._step_func = []
        # 和事件循環(huán)關(guān)聯(lián)
        if loop:
            self._loop = loop
        else:
            self._loop = get_event_loop()
    def add_done_callback(self, func):
        # 添加回調(diào)方法
        self._step_func.append(func)
    def set_value(self, value):
        # 設(shè)置值
        self.value = value
        for func in self._step_func:
            # 將回調(diào)方法添加到事件循環(huán)的 _ready 隊(duì)列中
            # 在下次事件循環(huán)中執(zhí)行回調(diào)方法
            self._loop.call_soon(func, self)
    # 實(shí)現(xiàn) __iter__ 方法,F(xiàn)uture 類(lèi)的實(shí)例為可迭代對(duì)象
    def __iter__(self):
        # 該語(yǔ)句起到暫停協(xié)程的作用,并返回實(shí)例本身
        yield self
        # 該語(yǔ)句定義的返回值會(huì)賦給 yield from 語(yǔ)句等號(hào)前面的變量
        return self.value

Future類(lèi)和以前的實(shí)現(xiàn),區(qū)分不是很大,只是和 loop 做關(guān)聯(lián),將回調(diào)方法放入事件循環(huán)的隊(duì)列中,依靠事件循環(huán)執(zhí)行回調(diào)方法

# Task 類(lèi),繼承 Future 類(lèi), 也是可迭代的
class Task(Future):
    def __init__(self, coro, loop=None):
        super().__init__(loop=loop)
        self.coro = coro
        # step 方法直接放入事件循環(huán)的隊(duì)列中進(jìn)行執(zhí)行
        # 激活協(xié)程方法運(yùn)行
        self._loop.call_soon(self.step, self)
    def step(self, future):
        try:
            # 向協(xié)程發(fā)送數(shù)據(jù),驅(qū)動(dòng)協(xié)程執(zhí)行
            # 直到遇到 yield ,返回新的 Future實(shí)例
            new_futrue = self.coro.send(future.value)
        except StopIteration as exc:
            # 在協(xié)程方法執(zhí)行到結(jié)束,或者 return 之后,拋出 StopIteration 異常
            # 由于 Task 也是 Future, 在協(xié)程執(zhí)行完之后,最后 Task 執(zhí)行自己的回調(diào)方法
            self.set_value(exc.value)
            return
        # 將 step 加入回調(diào)列表,等待下次驅(qū)動(dòng)執(zhí)行
        # 在 Future 執(zhí)行 set_value 時(shí),又會(huì)執(zhí)行 step 方法,再次驅(qū)動(dòng)協(xié)程執(zhí)行
        new_futrue.add_done_callback(self.step)

Task 類(lèi)繼承 Future 類(lèi),類(lèi)的實(shí)例也是可迭代對(duì)象,在 step 方法會(huì)不斷觸發(fā)協(xié)程繼續(xù)執(zhí)行,在協(xié)程執(zhí)行結(jié)束之后,拋出 StopIteration 異常,最后 Task 類(lèi)再執(zhí)行回調(diào)方法,主要做一些清理工作或者收集結(jié)果。

class AsyncSocket:
    def __init__(self, loop=None):
        # 綁定事件循環(huán)
        if loop:
            self._loop = loop
        else:
            self._loop = get_event_loop()
        self.sock = socket.socket()
        self.sock.setblocking(False)
    # 該方法用于向服務(wù)器發(fā)送連接請(qǐng)求并注冊(cè)監(jiān)聽(tīng)套接字的可寫(xiě)事件
    def connect(self, address):
        # 由 loop 創(chuàng)建 Future 實(shí)例
        f = self._loop.create_future()
        try:
            self.sock.connect(address)
        except BlockingIOError:
            pass
        # 注冊(cè)寫(xiě)事件,在連接成功事件發(fā)生之后,調(diào)用回調(diào)方法
        self._loop.add_writer(self.sock.fileno(), self._connect_cb, f)
        # 暫停執(zhí)行,等待寫(xiě)事件發(fā)生
        yield from f
    # 回調(diào)方法,連接事件發(fā)生
    def _connect_cb(self, future):
        # 設(shè)置 Future 值,并且驅(qū)動(dòng)協(xié)程繼續(xù)執(zhí)行
        future.set_value(None)
    # 向服務(wù)器發(fā)送獲取圖片的請(qǐng)求
    def send(self, data):
        self.sock.send(data)
    # 該方法會(huì)多次執(zhí)行,以獲取服務(wù)器返回的數(shù)據(jù)片段
    def read(self):
        f = self._loop.create_future()
        # 注冊(cè)讀事件,在可讀事件發(fā)生之后,調(diào)用回調(diào)方法
        self._loop.add_reader(self.sock.fileno(), self._read_cb, f, self.sock)
        # 暫停執(zhí)行,等待讀事件發(fā)生
        yield from f
        # 返回最后的值
        return f.value
    # 回調(diào)方法,讀事件發(fā)生
    def _read_cb(self, future, sock):
        # 套接字讀取 4096 字節(jié)的數(shù)據(jù),設(shè)置 Future 值,并且驅(qū)動(dòng)協(xié)程繼續(xù)執(zhí)行
        future.set_value(sock.recv(4096))
    # 讀取所有數(shù)據(jù)
    def read_all(self):
        data = b''
        while True:
            # 不斷讀取 sock 的數(shù)據(jù)
            value = yield from self.read()
            if value:
                data += value
            else:
                return data
    # 關(guān)閉客戶端套接字
    def close(self):
        self.sock.close()

AsyncSocket 類(lèi)是對(duì) Socket 做的簡(jiǎn)單封裝,綁定事件循環(huán) Loop ,讀寫(xiě)事件都是在 Loop 中注冊(cè),由事件循環(huán)來(lái)驅(qū)動(dòng) Socket 的讀寫(xiě)。

# 爬蟲(chóng)類(lèi)
class Crawler:
    def __init__(self, url):
        self._url = url
        self.url = urlparse(url)
        self.response = b''
    def fetch(self):
        self.time = time.time()
        # AsyncSocket 類(lèi)的實(shí)例對(duì)象負(fù)責(zé)完成數(shù)據(jù)獲取的工作
        sock = AsyncSocket()
        # 向服務(wù)器發(fā)送連接請(qǐng)求,協(xié)程會(huì)暫停到嵌套協(xié)程中的某個(gè) yield from 處
        yield from sock.connect((self.url.netloc, 80))
        data = 'GET {0} HTTP/1.1\r\nHost: {1}\r\nConnection: close\r\n\r\n \
                '.format(self.url.path, self.url.netloc)
        # 發(fā)送請(qǐng)求數(shù)據(jù)
        sock.send(data.encode())
        # 讀取全部的數(shù)據(jù)
        self.response = yield from sock.read_all()
        # 關(guān)閉 socket
        sock.close()
        # 將下載的圖片寫(xiě)入文件
        with open('pic/{}'.format(self.url.path[1:].split('/')[-1]), 'wb') as f:
            f.write(self.response.split(b'\r\n\r\n')[1])
        return "URL: {0}, 耗時(shí): {1:.3f}s".format(self._url, time.time() - self.time)

Crawler 完成圖片爬取工作,在 fetch 方法中,AsyncSocket 完成請(qǐng)求連接,發(fā)送數(shù)據(jù),接收數(shù)據(jù)的工作,整個(gè)流程非常的清晰,和同步阻塞模式流程基本相同,但是性能會(huì)有大量提升

# 事件循環(huán),全局變量
_event_loop = None
# 獲取事件循環(huán),這里是要獲取同一個(gè)全局實(shí)例
def get_event_loop():
    global _event_loop
    if _event_loop is None:
        # 生成一個(gè)新的事件循環(huán)實(shí)例
        _event_loop = EventLoop()
    return _event_loop
# 收集所有的 task
def gather(tasks, loop=None):
    # 使用 Future 類(lèi) 收集 所有 tasks 的結(jié)果
    outer = Future(loop=loop)
    # tasks 數(shù)量
    count = len(tasks)
    nfinished = 0
    # 收集結(jié)果
    results = []
    # 回調(diào)方法
    def _gather_cb(f):
        nonlocal nfinished
        # 完成數(shù)量
        nfinished += 1
        # 收集結(jié)果
        results.append(f.value)
        if nfinished == count:
            # 都完成之后,outer 設(shè)置值,同時(shí)執(zhí)行回調(diào)方法
            outer.set_value(results)
    for task in tasks:
        # 所有的 task 都添加回調(diào)方法
        # task 中協(xié)程方法執(zhí)行完成時(shí), 在 step 方法中會(huì)拋出 StopIteration 異常
        # 這時(shí)候 task 會(huì)執(zhí)行 set_value 方法,同時(shí)會(huì)執(zhí)行 _gather_cb 回調(diào)方法
        task.add_done_callback(_gather_cb)
    # 將 Future 實(shí)例返回
    return outer

以上獲取事件循環(huán)單例,所有的事件都是在一個(gè)循環(huán)中執(zhí)行。gather 方法使多個(gè) task 并發(fā)執(zhí)行,并且收集所有 task 的結(jié)果

def main():
    os.system('mkdir -p pic')
    start = time.time()
    loop = get_event_loop()
    tasks = []
    for url in urls:
        # 爬蟲(chóng)實(shí)例
        crawler = Crawler(url)
        # 將 fetch 方法封裝成 task 實(shí)例
        tasks.append(Task(crawler.fetch()))        
    # gather 方法將收集所有的 task 結(jié)果,并且返回 Futrue 實(shí)例,
    # Futrue 實(shí)例在 run_until_complete 方法中添加了 _run_until_complete_cb 回調(diào)方法
    # 在所有的 task 執(zhí)行結(jié)束之后,F(xiàn)uture 實(shí)例執(zhí)行 set_value 方法,同時(shí)執(zhí)行回調(diào)方法 _run_until_complete_cb
    # 在 _run_until_complete_cb 方法中執(zhí)行了 close 方法,無(wú)限循環(huán)結(jié)束,整個(gè)流程結(jié)束
    # 返回 results 的值
    results = loop.run_until_complete(gather(tasks))
    for res in results:
        print(res)
if __name__ == '__main__':
    main()

執(zhí)行 spider_event_loop.py 文件

輸出:

$ python3 spider_event_loop.py
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.055s
URL: https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.101s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.157s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.158s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.162s
URL: https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.164s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.163s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.164s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.224s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image, 耗時(shí): 0.492s
總共耗時(shí): 0.495s

使用事件循環(huán),異步執(zhí)行爬取數(shù)據(jù),消耗時(shí)間很少,性能很高。

現(xiàn)在我們把整個(gè)流程梳理一下:

  • 從main方法開(kāi)始執(zhí)行
  • 獲取 EventLoop 實(shí)例,loop 實(shí)例是全局變量
  • 將 urls 中的 url 分別進(jìn)行處理,生成 Crawler 實(shí)例
  • Crawler 的 fetch 方法是協(xié)程,不會(huì)立即執(zhí)行,Task 包裝協(xié)程方法
  • 在 Task 的 __init__ 方法中, 會(huì)將 Task 的 step 方法,加入到 loop 的 隊(duì)列中
  • step 方法會(huì)被封裝成 Handle 方法,加入到 loop 的 _ready 的隊(duì)列中
  • 在所有的 task 處理完成之后,加入到 tasks 列表中
  • 接下來(lái)執(zhí)行 gather 方法,outer 是 Future 實(shí)例, 所有的 task 實(shí)例都添加 _gather_cb 回調(diào)方法
  • 在 _gather_cb 方法中,會(huì)收集 task 的結(jié)果
  • 接下來(lái)執(zhí)行 loop 的 run_until_complete 方法,入?yún)⑹?gather 方法返回的 outer 的 Future 實(shí)例
  • run_until_complete 方法中,outer 實(shí)例加入 _run_until_complete_cb 回調(diào)方法
  • 開(kāi)始執(zhí)行 run_forever 方法,進(jìn)入循環(huán)
  • 執(zhí)行 run_once 方法,獲取所有的發(fā)生時(shí)間的文件描述符,由于現(xiàn)在還沒(méi)有注冊(cè)事件,所以事件列表為空
  • 將 _ready 隊(duì)列中所有的 handle 取出,開(kāi)始執(zhí)行,由于 task 的 __init__ 方法中將 step 放入了隊(duì)列中,所以這里會(huì)執(zhí)行 task 的 step 方法
  • task 中的 coro 是 fetch 協(xié)程,所以在 step 方法中 coro 會(huì)執(zhí)行 send 方法,由于 future 就是 task 實(shí)例,value 還是 None,所以會(huì)激活協(xié)程方法執(zhí)行
  • fetch 方法開(kāi)始真正執(zhí)行,創(chuàng)建 AsyncSocket 實(shí)例 sock
  • 執(zhí)行到 sock.connect((self.url.netloc, 80)) 會(huì)在 connect 方法中 創(chuàng)建新的 future 實(shí)例
  • 進(jìn)行 sock 請(qǐng)求連接,再將網(wǎng)絡(luò)套接字的文件描述符注冊(cè)寫(xiě)事件,在 add_writer 方法中 _connect_cb 封裝成 handle 實(shí)例,在 事件發(fā)生之后會(huì)執(zhí)行 handle 的 run 方法
  • 暫停執(zhí)行,返回 future 實(shí)例
  • 由于是在 task 的 step 方法中激活的協(xié)程執(zhí)行,所以 new_future 就是返回的 future 實(shí)例
  • new_future 將 step 加入到回調(diào)列表中
  • 現(xiàn)在流程都暫停了,等待事件的發(fā)生
  • 在事件循環(huán)中,注冊(cè)的文件描述符的寫(xiě)事件發(fā)生之后,_process_events 方法循環(huán)處理所有的事件,取出事件注冊(cè)時(shí)候的 handle 方法,加入到 _ready 執(zhí)行隊(duì)列里面
  • 再?gòu)?_ready 的隊(duì)列里面取出 handle 方法,此 handle 方法是注冊(cè)時(shí)候封裝的 _connect_cb 回調(diào)方法
  • 執(zhí)行 _connect_cb 方法,參數(shù)是 future,也是返回的 new_future,這時(shí)候再執(zhí)行 future 的 set_value 方法
  • 設(shè)置 future 的值,并且將回調(diào)隊(duì)列里的回調(diào)方法取出,加入到事件循環(huán)的 _ready 隊(duì)列里,由于 new_future 將 step 方法加入了回調(diào)隊(duì)列,所以會(huì)再次執(zhí)行 task 的 step 方法
  • step 又繼續(xù)驅(qū)動(dòng)協(xié)程執(zhí)行,fetch 又開(kāi)始了繼續(xù)執(zhí)行
  • sock 發(fā)送數(shù)據(jù),然后注冊(cè)讀事件,由于讀取數(shù)據(jù)時(shí)一次不會(huì)全部讀完,會(huì)多次注冊(cè)讀事件,讀取全部的數(shù)據(jù)
  • 讀事件和寫(xiě)事件是相同的,在注冊(cè)之后,就返回 future 對(duì)象,添加 step 回調(diào)方法,暫停執(zhí)行。等待事件的發(fā)生
  • 在所有的數(shù)據(jù)讀取之后,fetch 方法會(huì)執(zhí)行結(jié)束,return 當(dāng)前的數(shù)據(jù)
  • 由于 task 實(shí)例的 step 驅(qū)動(dòng)的 fetch 方法,所以 step 方法會(huì)拋出 StopIteration 異常
  • task 繼承的 Future,所以可以執(zhí)行 set_value 方法,異常的值就是 fetch 返回的值
  • 在 gather 方法中,task 添加了 _gather_cb 回調(diào)方法,所以在 set_value 時(shí),會(huì)調(diào)用回調(diào)方法
  • 將 task 的值收集到 results 列表中,等所有的 task 都執(zhí)行結(jié)束之后,outer 實(shí)例開(kāi)始執(zhí)行 set_value
  • outer 添加了 _run_until_complete_cb 回調(diào)方法,所以這里同樣會(huì)執(zhí)行回調(diào)方法,在 _run_until_complete_cb 方法中調(diào)用事件循環(huán)的 close 方法,_stopped 設(shè)置為 True
  • run_forever 退出無(wú)限循環(huán),整個(gè)流程執(zhí)行結(jié)束,將 outer 的值返回

整個(gè)流程梳理完了,Task 不斷驅(qū)動(dòng)協(xié)程執(zhí)行,EventLoop 監(jiān)聽(tīng)事件循環(huán),又不斷驅(qū)動(dòng) Task 執(zhí)行,F(xiàn)uture 在協(xié)程的通道中傳輸數(shù)據(jù),幾個(gè)部分配合合作完成整個(gè)流程。

以上就是簡(jiǎn)單理解Python中的事件循環(huán)EventLoop的詳細(xì)內(nèi)容,更多關(guān)于Python事件循環(huán)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺談python之高階函數(shù)和匿名函數(shù)

    淺談python之高階函數(shù)和匿名函數(shù)

    這篇文章主要介紹了python之高階函數(shù)和匿名函數(shù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • python實(shí)現(xiàn)從ftp服務(wù)器下載文件

    python實(shí)現(xiàn)從ftp服務(wù)器下載文件

    這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)從ftp服務(wù)器下載文件,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-03-03
  • 讓python同時(shí)兼容python2和python3的8個(gè)技巧分享

    讓python同時(shí)兼容python2和python3的8個(gè)技巧分享

    這篇文章主要介紹了讓python同時(shí)兼容python2和python3的8個(gè)技巧分享,對(duì)代碼稍微做些修改就可以很好的同時(shí)支持python2和python3的,需要的朋友可以參考下
    2014-07-07
  • 根據(jù)tensor的名字獲取變量的值方式

    根據(jù)tensor的名字獲取變量的值方式

    今天小編就為大家分享一篇根據(jù)tensor的名字獲取變量的值方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-01-01
  • Python 轉(zhuǎn)移文件至云對(duì)象存儲(chǔ)的方法

    Python 轉(zhuǎn)移文件至云對(duì)象存儲(chǔ)的方法

    對(duì)象存儲(chǔ)(Cloud Object Storage,COS)是一種存儲(chǔ)海量文件的分布式存儲(chǔ)服務(wù),具有高擴(kuò)展性、低成本、可靠安全等優(yōu)點(diǎn)。這篇文章主要介紹了Python 轉(zhuǎn)移文件至云對(duì)象存儲(chǔ),需要的朋友可以參考下
    2021-02-02
  • pycharm激活碼快速激活及使用步驟

    pycharm激活碼快速激活及使用步驟

    這篇文章主要介紹了pycharm激活碼快速激活及使用步驟,本文分步驟給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-03-03
  • 使用Python實(shí)現(xiàn)有趣的鎖屏小工具

    使用Python實(shí)現(xiàn)有趣的鎖屏小工具

    這篇文章主要為大家詳細(xì)介紹了如何使用Python實(shí)現(xiàn)有趣的鎖屏小工具,這樣再也不用擔(dān)心因?yàn)闆](méi)有鎖屏被扣工資啦,打工人快跟隨小編一起學(xué)習(xí)一下吧
    2023-12-12
  • 教你編譯pjsip源碼的方法

    教你編譯pjsip源碼的方法

    通過(guò)本文教大家如何編譯pjsip源碼,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2021-10-10
  • python制作圖片縮略圖

    python制作圖片縮略圖

    這篇文章主要為大家詳細(xì)介紹了python制作圖片縮略圖的相關(guān)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-04-04
  • 用Python shell簡(jiǎn)化開(kāi)發(fā)

    用Python shell簡(jiǎn)化開(kāi)發(fā)

    這篇文章給大家詳細(xì)分享了在Python的開(kāi)發(fā)中,如何通過(guò)Python shell簡(jiǎn)化開(kāi)發(fā),有興趣的朋友們可以學(xué)習(xí)下。
    2018-08-08

最新評(píng)論