亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

python并發(fā)和異步編程實例

 更新時間:2018年11月15日 08:38:29   作者:風(fēng)起云永  
這篇文章主要為大家詳細介紹了python并發(fā)和異步編程實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下

關(guān)于并發(fā)、并行、同步阻塞、異步非阻塞、線程、進程、協(xié)程等這些概念,單純通過文字恐怕很難有比較深刻的理解,本文就通過代碼一步步實現(xiàn)這些并發(fā)和異步編程,并進行比較。解釋器方面本文選擇python3,畢竟python3才是python的未來,并且python3用原生的庫實現(xiàn)協(xié)程已經(jīng)非常方便了。

1、準備階段

下面為所有測試代碼所需要的包

#! python3
# coding:utf-8

import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import asyncio
import aiohttp
import time
from time import ctime

在進行不同實現(xiàn)方式的比較時,實現(xiàn)場景就是在進行爬蟲開發(fā)的時候通過向?qū)Ψ骄W(wǎng)站發(fā)起一系列的http請求訪問,統(tǒng)計耗時來判斷實現(xiàn)方式的優(yōu)劣,具體地,通過建立通信套接字,訪問新浪主頁,返回源碼,作為一次請求。先實現(xiàn)一個裝飾器用來統(tǒng)計函數(shù)的執(zhí)行時間:

def tsfunc(func):
  def wrappedFunc(*args,**kargs):
    start = time.clock()
    action = func(*args,**kargs)
    time_delta = time.clock() - start
    print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
    return action
  return wrappedFunc

輸出的格式為:當(dāng)前時間,調(diào)用的函數(shù),函數(shù)的執(zhí)行時間。

2、阻塞/非阻塞和同步/異步

這兩對概念不是很好區(qū)分,從定義上理解:

阻塞:在進行socket通信過程中,一個線程發(fā)起請求,如果當(dāng)前請求沒有返回結(jié)果,則進入sleep狀態(tài),期間線程掛起不能做其他操作,直到有返回結(jié)果,或者超時(如果設(shè)置超時的話)。
非阻塞:與阻塞相似,只不過在等待請求結(jié)果時,線程并不掛起而是進行其他操作,即在不能立刻得到結(jié)果之前,該函數(shù)不會阻掛起當(dāng)前線程,而會立刻返回。
同步:同步和阻塞比較相似,但是二者并不是同一個概念,同步是指完成事件的邏輯,是指一件事完成之后,再完成第二件事,以此類推…
異步:異步和非阻塞比較類似,異步的概念和同步相對。當(dāng)一個異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。實際處理這個調(diào)用的部件在完成后,通過狀態(tài)、通知和回調(diào)來通知調(diào)用者,實現(xiàn)異步的方式通俗講就是“等會再告訴你”。

1)阻塞方式

回到代碼上,首先實現(xiàn)阻塞方式的請求函數(shù):

def blocking_way():
  sock = socket.socket()
  sock.connect(('www.sina.com',80))
  request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
  sock.send(request.encode('ascii'))
  response = b''
  chunk = sock.recv(4096)
  while chunk:
    response += chunk
    chunk = sock.recv(4096)
  return response

測試線程、多進程和多線程

# 阻塞無并發(fā)
@tsfunc
def sync_way():
  res = []
  for i in range(10):
    res.append(blocking_way())
  return len(res)
@tsfunc
# 阻塞、多進程
def process_way():
  worker = 10
  with futures.ProcessPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])
# 阻塞、多線程
@tsfunc
def thread_way():
  worker = 10
  with futures.ThreadPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])

運行結(jié)果:

[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328
[Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734
[Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727

可見與非并發(fā)的方式相比,啟動10個進程完成10次請求訪問耗費的時間最長,進程確實需要很大的系統(tǒng)開銷,相比多線程則效果好得多,啟動10個線程并發(fā)請求,比順序請求速度快了6倍左右。

2)非阻塞方式

實現(xiàn)非阻塞的請求代碼,與阻塞方式的區(qū)別在于等待請求時并不掛起而是直接返回,為了確保能正確讀取消息,最原始的方式就是循環(huán)讀取,知道讀取完成為跳出循環(huán),代碼如下:

def nonblocking_way():
  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  sock.setblocking(False)
  try:
    sock.connect(('www.sina.com', 80))
  except BlockingIOError:
    pass
  request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
  data = request.encode('ascii')
  while True:
    try:
      sock.send(data)
      break
    except OSError:
      pass

  response = b''
  while True:
    try:
      chunk = sock.recv(4096)
      while chunk:
        response += chunk
        chunk = sock.recv(4096)
      break
    except OSError:
      pass

  return response

測試單線程異步非阻塞方式:

@tsfunc
def async_way():
  res = []
  for i in range(10):
    res.append(nonblocking_way())
  return len(res)

測試結(jié)果與單線程同步阻塞方式相比:

[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574
[Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886

非阻塞方式起到了一定的效果,但是并不明顯,原因肯定是讀取消息的時候雖然不是在線程掛起的時候而是在循環(huán)讀取消息的時候浪費了時間,如果大部分時間讀浪費了并沒有發(fā)揮異步編程的威力,解決的辦法就是后面要說的【事件驅(qū)動】

3、回調(diào)、生成器和協(xié)程

a、回調(diào)

class Crawler():
  def __init__(self,url):
    self.url = url
    self.sock = None
    self.response = b''

  def fetch(self):
    self.sock = socket.socket()
    self.sock.setblocking(False)
    try:
      self.sock.connect(('www.sina.com',80))
    except BlockingIOError:
      pass
    selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

  def connected(self,key,mask):
    selector.unregister(key.fd)
    get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
    self.sock.send(get.encode('ascii'))
    selector.register(key.fd,EVENT_READ,self.read_response)

  def read_response(self,key,mask):
    global stopped
    while True:
      try:
        chunk = self.sock.recv(4096)
        if chunk:
          self.response += chunk
          chunk = self.sock.recv(4096)
        else:
          selector.unregister(key.fd)
          urls_todo.remove(self.url)
          if not urls_todo:
            stopped = True
        break
      except:
        pass

def loop():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback(event_key,event_mask)
 @tsfunc
def callback_way():
  for url in urls_todo:
    crawler = Crawler(url)
    crawler.fetch()
  loop1()

這是通過傳統(tǒng)回調(diào)方式實現(xiàn)的異步編程,結(jié)果如下:

[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374

b、生成器

class Crawler2:
  def __init__(self, url):
    self.url = url
    self.response = b''

  def fetch(self):
    global stopped
    sock = socket.socket()
    yield from connect(sock, ('www.sina.com', 80))
    get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))
    self.response = yield from read_all(sock)
    urls_todo.remove(self.url)
    if not urls_todo:
      stopped = True

class Task:
  def __init__(self, coro):
    self.coro = coro
    f = Future1()
    f.set_result(None)
    self.step(f)

  def step(self, future):
    try:
      # send會進入到coro執(zhí)行, 即fetch, 直到下次yield
      # next_future 為yield返回的對象
      next_future = self.coro.send(future.result)
    except StopIteration:
      return
    next_future.add_done_callback(self.step)

def loop1():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback()

運行結(jié)果如下:

[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473

c、協(xié)程

def nonblocking_way():
  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  sock.setblocking(False)
  try:
    sock.connect(('www.sina.com', 80))
  except BlockingIOError:
    pass
  request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
  data = request.encode('ascii')
  while True:
    try:
      sock.send(data)
      break
    except OSError:
      pass

  response = b''
  while True:
    try:
      chunk = sock.recv(4096)
      while chunk:
        response += chunk
        chunk = sock.recv(4096)
      break
    except OSError:
      pass

  return response
@tsfunc
def asyncio_way():
    tasks = [fetch(host+url) for url in urls_todo]
    loop.run_until_complete(asyncio.gather(*tasks))
    return (len(tasks))

運行結(jié)果:

[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166

到此終于把并發(fā)和異步編程實例代碼測試完,下邊貼出全部代碼,共讀者自行測試,在任務(wù)量加大時,相信結(jié)果會大不一樣。

#! python3
# coding:utf-8

import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import asyncio
import aiohttp
import time
from time import ctime

def tsfunc(func):
  def wrappedFunc(*args,**kargs):
    start = time.clock()
    action = func(*args,**kargs)
    time_delta = time.clock() - start
    print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
    return action
  return wrappedFunc

def blocking_way():
  sock = socket.socket()
  sock.connect(('www.sina.com',80))
  request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
  sock.send(request.encode('ascii'))
  response = b''
  chunk = sock.recv(4096)
  while chunk:
    response += chunk
    chunk = sock.recv(4096)
  return response

def nonblocking_way():
  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  sock.setblocking(False)
  try:
    sock.connect(('www.sina.com', 80))
  except BlockingIOError:
    pass
  request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
  data = request.encode('ascii')
  while True:
    try:
      sock.send(data)
      break
    except OSError:
      pass

  response = b''
  while True:
    try:
      chunk = sock.recv(4096)
      while chunk:
        response += chunk
        chunk = sock.recv(4096)
      break
    except OSError:
      pass

  return response


selector = DefaultSelector()
stopped = False
urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']


class Crawler():
  def __init__(self,url):
    self.url = url
    self.sock = None
    self.response = b''

  def fetch(self):
    self.sock = socket.socket()
    self.sock.setblocking(False)
    try:
      self.sock.connect(('www.sina.com',80))
    except BlockingIOError:
      pass
    selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

  def connected(self,key,mask):
    selector.unregister(key.fd)
    get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
    self.sock.send(get.encode('ascii'))
    selector.register(key.fd,EVENT_READ,self.read_response)

  def read_response(self,key,mask):
    global stopped
    while True:
      try:
        chunk = self.sock.recv(4096)
        if chunk:
          self.response += chunk
          chunk = self.sock.recv(4096)
        else:
          selector.unregister(key.fd)
          urls_todo.remove(self.url)
          if not urls_todo:
            stopped = True
        break
      except:
        pass

def loop():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback(event_key,event_mask)


# 基于生成器的協(xié)程
class Future:
  def __init__(self):
    self.result = None
    self._callbacks = []

  def add_done_callback(self,fn):
    self._callbacks.append(fn)

  def set_result(self,result):
    self.result = result
    for fn in self._callbacks:
      fn(self)

class Crawler1():
  def __init__(self,url):
    self.url = url
    self.response = b''

  def fetch(self):
    sock = socket.socket()
    sock.setblocking(False)
    try:
      sock.connect(('www.sina.com',80))
    except BlockingIOError:
      pass

    f = Future()
    def on_connected():
      f.set_result(None)

    selector.register(sock.fileno(),EVENT_WRITE,on_connected)
    yield f
    selector.unregister(sock.fileno())
    get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))

    global stopped
    while True:
      f = Future()
      def on_readable():
        f.set_result(sock.recv(4096))
      selector.register(sock.fileno(),EVENT_READ,on_readable)
      chunk = yield f
      selector.unregister(sock.fileno())
      if chunk:
        self.response += chunk
      else:
        urls_todo.remove(self.url)
        if not urls_todo:
          stopped = True
        break


# yield from 改進的生成器協(xié)程
class Future1:
  def __init__(self):
    self.result = None
    self._callbacks = []

  def add_done_callback(self,fn):
    self._callbacks.append(fn)

  def set_result(self,result):
    self.result = result
    for fn in self._callbacks:
      fn(self)

  def __iter__(self):
    yield self
    return self.result

def connect(sock, address):
  f = Future1()
  sock.setblocking(False)
  try:
    sock.connect(address)
  except BlockingIOError:
    pass

  def on_connected():
    f.set_result(None)

  selector.register(sock.fileno(), EVENT_WRITE, on_connected)
  yield from f
  selector.unregister(sock.fileno())

def read(sock):
  f = Future1()

  def on_readable():
    f.set_result(sock.recv(4096))

  selector.register(sock.fileno(), EVENT_READ, on_readable)
  chunk = yield from f
  selector.unregister(sock.fileno())
  return chunk

def read_all(sock):
  response = []
  chunk = yield from read(sock)
  while chunk:
    response.append(chunk)
    chunk = yield from read(sock)
  return b''.join(response)

class Crawler2:
  def __init__(self, url):
    self.url = url
    self.response = b''

  def fetch(self):
    global stopped
    sock = socket.socket()
    yield from connect(sock, ('www.sina.com', 80))
    get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))
    self.response = yield from read_all(sock)
    urls_todo.remove(self.url)
    if not urls_todo:
      stopped = True


class Task:
  def __init__(self, coro):
    self.coro = coro
    f = Future1()
    f.set_result(None)
    self.step(f)

  def step(self, future):
    try:
      # send會進入到coro執(zhí)行, 即fetch, 直到下次yield
      # next_future 為yield返回的對象
      next_future = self.coro.send(future.result)
    except StopIteration:
      return
    next_future.add_done_callback(self.step)

def loop1():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback()


# asyncio 協(xié)程
host = 'http://www.sina.com'
loop = asyncio.get_event_loop()

async def fetch(url):
  async with aiohttp.ClientSession(loop=loop) as session:
    async with session.get(url) as response:
      response = await response.read()
      return response

@tsfunc
def asyncio_way():
    tasks = [fetch(host+url) for url in urls_todo]
    loop.run_until_complete(asyncio.gather(*tasks))
    return (len(tasks))

@tsfunc
def sync_way():
  res = []
  for i in range(10):
    res.append(blocking_way())
  return len(res)

@tsfunc
def process_way():
  worker = 10
  with futures.ProcessPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])

@tsfunc
def thread_way():
  worker = 10
  with futures.ThreadPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])

@tsfunc
def async_way():
  res = []
  for i in range(10):
    res.append(nonblocking_way())
  return len(res)

@tsfunc
def callback_way():
  for url in urls_todo:
    crawler = Crawler(url)
    crawler.fetch()
  loop1()

@tsfunc
def generate_way():
  for url in urls_todo:
    crawler = Crawler2(url)
    Task(crawler.fetch())
  loop1()

if __name__ == '__main__':

  #sync_way()
  #process_way()
  #thread_way()
  #async_way()
  #callback_way()
  #generate_way()
  asyncio_way()

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • python讀csv文件時指定行為表頭或無表頭的方法

    python讀csv文件時指定行為表頭或無表頭的方法

    這篇文章主要介紹了python讀csv文件時指定行為表頭或無表頭的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • Python爬蟲基礎(chǔ)之requestes模塊

    Python爬蟲基礎(chǔ)之requestes模塊

    這篇文章主要介紹了Python爬蟲基礎(chǔ)之requestes模塊,文中有非常詳細的代碼示例,對正在學(xué)習(xí)python爬蟲的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-04-04
  • pytz格式化北京時間多出6分鐘問題的解決方法

    pytz格式化北京時間多出6分鐘問題的解決方法

    這篇文章主要給大家介紹了關(guān)于pytz格式化北京時間多出6分鐘問題的解決方法,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用Python具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • python 實現(xiàn)數(shù)組list 添加、修改、刪除的方法

    python 實現(xiàn)數(shù)組list 添加、修改、刪除的方法

    下面小編就為大家分享一篇python 實現(xiàn)數(shù)組list 添加、修改、刪除的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • Python?NumPy隨機抽模塊介紹及方法

    Python?NumPy隨機抽模塊介紹及方法

    這篇文章主要介紹了Python?NumPy隨機抽模塊介紹及方法,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下
    2022-09-09
  • python gdal安裝與簡單使用

    python gdal安裝與簡單使用

    這篇文章主要介紹了python gdal安裝與簡單使用,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-08-08
  • Python分析最近大火的網(wǎng)劇《隱秘的角落》

    Python分析最近大火的網(wǎng)劇《隱秘的角落》

    這篇文章主要介紹了Python分析最近大火的網(wǎng)劇《隱秘的角落》,本文通過實例代碼圖文相結(jié)合給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • Python基于Django實現(xiàn)驗證碼登錄功能

    Python基于Django實現(xiàn)驗證碼登錄功能

    驗證碼登錄是一種常見的身份驗證方式,它可以有效防止惡意攻擊和機器人登錄,本文將介紹如何基于Python?Django實現(xiàn)驗證碼登錄功能,需要的可以參考一下
    2023-05-05
  • Python面向?qū)ο缶幊讨惖倪\算

    Python面向?qū)ο缶幊讨惖倪\算

    這篇文章主要介紹了Python面向?qū)ο缶幊讨惖倪\算,運算即Operation是操作邏輯的抽象,運算體現(xiàn)一種操作邏輯,在廣義角度來說任何程序都是一種運算,接下來看看文章具體內(nèi)容及各種舉例說明,希望對你有所幫助
    2021-11-11
  • Django Admin中增加導(dǎo)出Excel功能過程解析

    Django Admin中增加導(dǎo)出Excel功能過程解析

    這篇文章主要介紹了Django Admin中增加導(dǎo)出Excel功能過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-09-09

最新評論