亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python 使用生成器代替線程的方法

 更新時(shí)間:2020年08月04日 11:54:47   作者:David Beazley  
這篇文章主要介紹了Python 使用生成器代替線程的方法,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下

問題

你想使用生成器(協(xié)程)替代系統(tǒng)線程來實(shí)現(xiàn)并發(fā)。這個(gè)有時(shí)又被稱為用戶級線程或綠色線程。

解決方案

要使用生成器實(shí)現(xiàn)自己的并發(fā),你首先要對生成器函數(shù)和 yield 語句有深刻理解。 yield 語句會讓一個(gè)生成器掛起它的執(zhí)行,這樣就可以編寫一個(gè)調(diào)度器, 將生成器當(dāng)做某種“任務(wù)”并使用任務(wù)協(xié)作切換來替換它們的執(zhí)行。 要演示這種思想,考慮下面兩個(gè)使用簡單的 yield 語句的生成器函數(shù):

# Two simple generator functions
def countdown(n):
  while n > 0:
    print('T-minus', n)
    yield
    n -= 1
  print('Blastoff!')

def countup(n):
  x = 0
  while x < n:
    print('Counting up', x)
    yield
    x += 1

這些函數(shù)在內(nèi)部使用yield語句,下面是一個(gè)實(shí)現(xiàn)了簡單任務(wù)調(diào)度器的代碼:

from collections import deque

class TaskScheduler:
  def __init__(self):
    self._task_queue = deque()

  def new_task(self, task):
    '''
    Admit a newly started task to the scheduler
    '''
    self._task_queue.append(task)

  def run(self):
    '''
    Run until there are no more tasks
    '''
    while self._task_queue:
      task = self._task_queue.popleft()
      try:
        # Run until the next yield statement
        next(task)
        self._task_queue.append(task)
      except StopIteration:
        # Generator is no longer executing
        pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()

TaskScheduler 類在一個(gè)循環(huán)中運(yùn)行生成器集合——每個(gè)都運(yùn)行到碰到y(tǒng)ield語句為止。 運(yùn)行這個(gè)例子,輸出如下:

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...

到此為止,我們實(shí)際上已經(jīng)實(shí)現(xiàn)了一個(gè)“操作系統(tǒng)”的最小核心部分。 生成器函數(shù)就是任務(wù),而yield語句是任務(wù)掛起的信號。 調(diào)度器循環(huán)檢查任務(wù)列表直到?jīng)]有任務(wù)要執(zhí)行為止。

實(shí)際上,你可能想要使用生成器來實(shí)現(xiàn)簡單的并發(fā)。 那么,在實(shí)現(xiàn)actor或網(wǎng)絡(luò)服務(wù)器的時(shí)候你可以使用生成器來替代線程的使用。

下面的代碼演示了使用生成器來實(shí)現(xiàn)一個(gè)不依賴線程的actor:

from collections import deque

class ActorScheduler:
  def __init__(self):
    self._actors = {}     # Mapping of names to actors
    self._msg_queue = deque()  # Message queue

  def new_actor(self, name, actor):
    '''
    Admit a newly started actor to the scheduler and give it a name
    '''
    self._msg_queue.append((actor,None))
    self._actors[name] = actor

  def send(self, name, msg):
    '''
    Send a message to a named actor
    '''
    actor = self._actors.get(name)
    if actor:
      self._msg_queue.append((actor,msg))

  def run(self):
    '''
    Run as long as there are pending messages.
    '''
    while self._msg_queue:
      actor, msg = self._msg_queue.popleft()
      try:
         actor.send(msg)
      except StopIteration:
         pass

# Example use
if __name__ == '__main__':
  def printer():
    while True:
      msg = yield
      print('Got:', msg)

  def counter(sched):
    while True:
      # Receive the current count
      n = yield
      if n == 0:
        break
      # Send to the printer task
      sched.send('printer', n)
      # Send the next count to the counter task (recursive)
      sched.send('counter', n-1)

  sched = ActorScheduler()
  # Create the initial actors
  sched.new_actor('printer', printer())
  sched.new_actor('counter', counter(sched))

  # Send an initial message to the counter to initiate
  sched.send('counter', 10000)
  sched.run()

完全弄懂這段代碼需要更深入的學(xué)習(xí),但是關(guān)鍵點(diǎn)在于收集消息的隊(duì)列。 本質(zhì)上,調(diào)度器在有需要發(fā)送的消息時(shí)會一直運(yùn)行著。 計(jì)數(shù)生成器會給自己發(fā)送消息并在一個(gè)遞歸循環(huán)中結(jié)束。

下面是一個(gè)更加高級的例子,演示了使用生成器來實(shí)現(xiàn)一個(gè)并發(fā)網(wǎng)絡(luò)應(yīng)用程序:

from collections import deque
from select import select

# This class represents a generic yield event in the scheduler
class YieldEvent:
  def handle_yield(self, sched, task):
    pass

  def handle_resume(self, sched, task):
    pass

# Task Scheduler
class Scheduler:
  def __init__(self):
    self._numtasks = 0    # Total num of tasks
    self._ready = deque()  # Tasks ready to run
    self._read_waiting = {} # Tasks waiting to read
    self._write_waiting = {} # Tasks waiting to write

  # Poll for I/O events and restart waiting tasks
  def _iopoll(self):
    rset,wset,eset = select(self._read_waiting,
                self._write_waiting,[])
    for r in rset:
      evt, task = self._read_waiting.pop(r)
      evt.handle_resume(self, task)
    for w in wset:
      evt, task = self._write_waiting.pop(w)
      evt.handle_resume(self, task)

  def new(self,task):
    '''
    Add a newly started task to the scheduler
    '''
    self._ready.append((task, None))
    self._numtasks += 1

  def add_ready(self, task, msg=None):
    '''
    Append an already started task to the ready queue.
    msg is what to send into the task when it resumes.
    '''
    self._ready.append((task, msg))

  # Add a task to the reading set
  def _read_wait(self, fileno, evt, task):
    self._read_waiting[fileno] = (evt, task)

  # Add a task to the write set
  def _write_wait(self, fileno, evt, task):
    self._write_waiting[fileno] = (evt, task)

  def run(self):
    '''
    Run the task scheduler until there are no tasks
    '''
    while self._numtasks:
       if not self._ready:
         self._iopoll()
       task, msg = self._ready.popleft()
       try:
         # Run the coroutine to the next yield
         r = task.send(msg)
         if isinstance(r, YieldEvent):
           r.handle_yield(self, task)
         else:
           raise RuntimeError('unrecognized yield event')
       except StopIteration:
         self._numtasks -= 1

# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
  def __init__(self, sock, nbytes):
    self.sock = sock
    self.nbytes = nbytes
  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)
  def handle_resume(self, sched, task):
    data = self.sock.recv(self.nbytes)
    sched.add_ready(task, data)

class WriteSocket(YieldEvent):
  def __init__(self, sock, data):
    self.sock = sock
    self.data = data

  def handle_yield(self, sched, task):
    sched._write_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    nsent = self.sock.send(self.data)
    sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
  def __init__(self, sock):
    self.sock = sock

  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    r = self.sock.accept()
    sched.add_ready(task, r)

# Wrapper around a socket object for use with yield
class Socket(object):
  def __init__(self, sock):
    self._sock = sock

  def recv(self, maxbytes):
    return ReadSocket(self._sock, maxbytes)

  def send(self, data):
    return WriteSocket(self._sock, data)

  def accept(self):
    return AcceptSocket(self._sock)

  def __getattr__(self, name):
    return getattr(self._sock, name)

if __name__ == '__main__':
  from socket import socket, AF_INET, SOCK_STREAM
  import time

  # Example of a function involving generators. This should
  # be called using line = yield from readline(sock)
  def readline(sock):
    chars = []
    while True:
      c = yield sock.recv(1)
      if not c:
        break
      chars.append(c)
      if c == b'\n':
        break
    return b''.join(chars)

  # Echo server using generators
  class EchoServer:
    def __init__(self,addr,sched):
      self.sched = sched
      sched.new(self.server_loop(addr))

    def server_loop(self,addr):
      s = Socket(socket(AF_INET,SOCK_STREAM))

      s.bind(addr)
      s.listen(5)
      while True:
        c,a = yield s.accept()
        print('Got connection from ', a)
        self.sched.new(self.client_handler(Socket(c)))

    def client_handler(self,client):
      while True:
        line = yield from readline(client)
        if not line:
          break
        line = b'GOT:' + line
        while line:
          nsent = yield client.send(line)
          line = line[nsent:]
      client.close()
      print('Client closed')

  sched = Scheduler()
  EchoServer(('',16000),sched)
  sched.run()

這段代碼有點(diǎn)復(fù)雜。不過,它實(shí)現(xiàn)了一個(gè)小型的操作系統(tǒng)。 有一個(gè)就緒的任務(wù)隊(duì)列,并且還有因I/O休眠的任務(wù)等待區(qū)域。 還有很多調(diào)度器負(fù)責(zé)在就緒隊(duì)列和I/O等待區(qū)域之間移動(dòng)任務(wù)。

討論

在構(gòu)建基于生成器的并發(fā)框架時(shí),通常會使用更常見的yield形式:

def some_generator():
  ...
  result = yield data
  ...

使用這種形式的yield語句的函數(shù)通常被稱為“協(xié)程”。 通過調(diào)度器,yield語句在一個(gè)循環(huán)中被處理,如下:

f = some_generator()

# Initial result. Is None to start since nothing has been computed
result = None
while True:
  try:
    data = f.send(result)
    result = ... do some calculation ...
  except StopIteration:
    break

這里的邏輯稍微有點(diǎn)復(fù)雜。不過,被傳給 send() 的值定義了在yield語句醒來時(shí)的返回值。 因此,如果一個(gè)yield準(zhǔn)備在對之前yield數(shù)據(jù)的回應(yīng)中返回結(jié)果時(shí),會在下一次 send() 操作返回。 如果一個(gè)生成器函數(shù)剛開始運(yùn)行,發(fā)送一個(gè)None值會讓它排在第一個(gè)yield語句前面。

除了發(fā)送值外,還可以在一個(gè)生成器上面執(zhí)行一個(gè) close() 方法。 它會導(dǎo)致在執(zhí)行yield語句時(shí)拋出一個(gè) GeneratorExit 異常,從而終止執(zhí)行。 如果進(jìn)一步設(shè)計(jì),一個(gè)生成器可以捕獲這個(gè)異常并執(zhí)行清理操作。 同樣還可以使用生成器的 throw() 方法在yield語句執(zhí)行時(shí)生成一個(gè)任意的執(zhí)行指令。 一個(gè)任務(wù)調(diào)度器可利用它來在運(yùn)行的生成器中處理錯(cuò)誤。

最后一個(gè)例子中使用的 yield from 語句被用來實(shí)現(xiàn)協(xié)程,可以被其它生成器作為子程序或過程來調(diào)用。 本質(zhì)上就是將控制權(quán)透明的傳輸給新的函數(shù)。 不像普通的生成器,一個(gè)使用 yield from 被調(diào)用的函數(shù)可以返回一個(gè)作為 yield from 語句結(jié)果的值。 關(guān)于 yield from 的更多信息可以在 PEP 380 中找到。

最后,如果使用生成器編程,要提醒你的是它還是有很多缺點(diǎn)的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執(zhí)行CPU依賴或I/O阻塞程序, 它會將整個(gè)任務(wù)掛起直到操作完成。為了解決這個(gè)問題, 你只能選擇將操作委派給另外一個(gè)可以獨(dú)立運(yùn)行的線程或進(jìn)程。 另外一個(gè)限制是大部分Python庫并不能很好的兼容基于生成器的線程。 如果你選擇這個(gè)方案,你會發(fā)現(xiàn)你需要自己改寫很多標(biāo)準(zhǔn)庫函數(shù)。 作為本節(jié)提到的協(xié)程和相關(guān)技術(shù)的一個(gè)基礎(chǔ)背景,可以查看 PEP 342 和 “協(xié)程和并發(fā)的一門有趣課程

PEP 3156 同樣有一個(gè)關(guān)于使用協(xié)程的異步I/O模型。 特別的,你不可能自己去實(shí)現(xiàn)一個(gè)底層的協(xié)程調(diào)度器。 不過,關(guān)于協(xié)程的思想是很多流行庫的基礎(chǔ), 包括 gevent, greenlet, Stackless Python 以及其他類似工程。

以上就是Python 使用生成器代替線程的方法的詳細(xì)內(nèi)容,更多關(guān)于Python 生成器代替線程的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺談Django的緩存機(jī)制

    淺談Django的緩存機(jī)制

    這篇文章主要介紹了淺談Django的緩存機(jī)制,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-08-08
  • pyspark 隨機(jī)森林的實(shí)現(xiàn)

    pyspark 隨機(jī)森林的實(shí)現(xiàn)

    這篇文章主要介紹了pyspark 隨機(jī)森林的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • 詳解Python中xlwt庫的基本操作

    詳解Python中xlwt庫的基本操作

    xlwt 是一個(gè)用于在Python中操作Excel文件的庫,它允許用戶創(chuàng)建、修改和寫入Excel文件,本文主要為大家介紹了xlwt庫的一些基本操作,需要的可以參考一下
    2023-11-11
  • Python項(xiàng)目跨域問題解決方案

    Python項(xiàng)目跨域問題解決方案

    這篇文章主要介紹了Python項(xiàng)目跨域問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Pandas實(shí)現(xiàn)Excel文件讀取,增刪,打開,保存操作

    Pandas實(shí)現(xiàn)Excel文件讀取,增刪,打開,保存操作

    Pandas?是一種基于?NumPy?的開源數(shù)據(jù)分析工具,用于處理和分析大量數(shù)據(jù)。本文將通過Pandas實(shí)現(xiàn)對Excel文件進(jìn)行讀取、增刪、打開、保存等操作,需要的可以參考一下
    2023-04-04
  • python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解

    python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解

    這篇文章主要為大家介紹了python神經(jīng)網(wǎng)絡(luò)Densenet模型復(fù)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • 5行Python代碼實(shí)現(xiàn)一鍵批量扣圖

    5行Python代碼實(shí)現(xiàn)一鍵批量扣圖

    在日常生活或者工作中,經(jīng)常會遇到想將某張照片中的人物摳出來,本文就介紹了Python代碼實(shí)現(xiàn)一鍵批量扣圖,感興趣的可以了解一下
    2021-06-06
  • 使用pytorch進(jìn)行圖像的順序讀取方法

    使用pytorch進(jìn)行圖像的順序讀取方法

    今天小編就為大家分享一篇使用pytorch進(jìn)行圖像的順序讀取方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • python實(shí)現(xiàn)QQ定時(shí)發(fā)送新年祝福信息

    python實(shí)現(xiàn)QQ定時(shí)發(fā)送新年祝福信息

    大家好,本篇文章主要講的是python實(shí)現(xiàn)QQ定時(shí)發(fā)送新年祝福信息,感興趣的同學(xué)感快來看一看吧,對你有幫助的話記得收藏一下
    2022-02-02
  • Python splitlines使用技巧

    Python splitlines使用技巧

    Python中的splitlines用來分割行。當(dāng)傳入的參數(shù)為True時(shí),表示保留換行符 \n。通過下面的例子就很明白了
    2008-09-09

最新評論