亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

python線程池threadpool實(shí)現(xiàn)篇

 更新時(shí)間:2018年04月27日 09:25:32   作者:菜鳥磊子  
這篇文章主要為大家詳細(xì)介紹了python線程池threadpool的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

本文為大家分享了threadpool線程池中所有的操作,供大家參考,具體內(nèi)容如下

首先介紹一下自己使用到的名詞:

工作線程(worker):創(chuàng)建線程池時(shí),按照指定的線程數(shù)量,創(chuàng)建工作線程,等待從任務(wù)隊(duì)列中g(shù)et任務(wù);

任務(wù)(requests):即工作線程處理的任務(wù),任務(wù)可能成千上萬個(gè),但是工作線程只有少數(shù)。任務(wù)通過          makeRequests來創(chuàng)建

任務(wù)隊(duì)列(request_queue):存放任務(wù)的隊(duì)列,使用了queue實(shí)現(xiàn)的。工作線程從任務(wù)隊(duì)列中g(shù)et任務(wù)進(jìn)行處理;

任務(wù)處理函數(shù)(callable):工作線程get到任務(wù)后,通過調(diào)用任務(wù)的任務(wù)處理函數(shù)即(request.callable_)具體     的     處理任務(wù),并返回處理結(jié)果;

任務(wù)結(jié)果隊(duì)列(result_queue):任務(wù)處理完成后,將返回的處理結(jié)果,放入到任務(wù)結(jié)果隊(duì)列中(包括異常);

任務(wù)異常處理函數(shù)或回調(diào)(exc_callback):從任務(wù)結(jié)果隊(duì)列中g(shù)et結(jié)果,如果設(shè)置了異常,則需要調(diào)用異?;卣{(diào)處理異常;

任務(wù)結(jié)果回調(diào)(callback):從任務(wù)結(jié)果隊(duì)列中g(shù)et結(jié)果,對(duì)result進(jìn)行進(jìn)一步處理;

上一節(jié)介紹了線程池threadpool的安裝和使用,本節(jié)將主要介紹線程池工作的主要流程:

(1)線程池的創(chuàng)建
(2)工作線程的啟動(dòng)
(3)任務(wù)的創(chuàng)建
(4)任務(wù)的推送到線程池
(5)線程處理任務(wù)
(6)任務(wù)結(jié)束處理
(7)工作線程的退出

下面是threadpool的定義:

class ThreadPool: 
 """A thread pool, distributing work requests and collecting results. 
 
 See the module docstring for more information. 
 
 """ 
 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 
  pass 
 def createWorkers(self, num_workers, poll_timeout=5): 
  pass 
 def dismissWorkers(self, num_workers, do_join=False): 
  pass 
 def joinAllDismissedWorkers(self): 
  pass 
 def putRequest(self, request, block=True, timeout=None): 
  pass 
 def poll(self, block=False): 
  pass 
 def wait(self): 
  pass 

1、線程池的創(chuàng)建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

task_pool=threadpool.ThreadPool(num_works) 
 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 
  """Set up the thread pool and start num_workers worker threads. 
 
  ``num_workers`` is the number of worker threads to start initially. 
 
  If ``q_size > 0`` the size of the work *request queue* is limited and 
  the thread pool blocks when the queue is full and it tries to put 
  more work requests in it (see ``putRequest`` method), unless you also 
  use a positive ``timeout`` value for ``putRequest``. 
 
  If ``resq_size > 0`` the size of the *results queue* is limited and the 
  worker threads will block when the queue is full and they try to put 
  new results in it. 
 
  .. warning: 
   If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 
   the possibilty of a deadlock, when the results queue is not pulled 
   regularly and too many jobs are put in the work requests queue. 
   To prevent this, always set ``timeout > 0`` when calling 
   ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 
 
  """ 
  self._requests_queue = Queue.Queue(q_size)#任務(wù)隊(duì)列,通過threadpool.makeReuests(args)創(chuàng)建的任務(wù)都會(huì)放到此隊(duì)列中 
  self._results_queue = Queue.Queue(resq_size)#字典,任務(wù)對(duì)應(yīng)的任務(wù)執(zhí)行結(jié)果</span> 
  self.workers = []#工作線程list,通過self.createWorkers()函數(shù)內(nèi)創(chuàng)建的工作線程會(huì)放到此工作線程list中 
  self.dismissedWorkers = []#被設(shè)置線程事件并且沒有被join的工作線程 
  self.workRequests = {}#字典,記錄任務(wù)被分配到哪個(gè)工作線程中</span> 
  self.createWorkers(num_workers, poll_timeout) 

其中,初始化參數(shù)為:

num_works:   線程池中線程個(gè)數(shù)
q_size :   任務(wù)隊(duì)列的長(zhǎng)度限制,如果限制了隊(duì)列的長(zhǎng)度,那么當(dāng)調(diào)用putRequest()添加任務(wù)時(shí),到達(dá)限制長(zhǎng)度后,那么putRequest將會(huì)不斷嘗試添加任務(wù),除非在putRequest()設(shè)置了超時(shí)或者阻塞; 
esq_size:  任務(wù)結(jié)果隊(duì)列的長(zhǎng)度;
pool_timeout:  工作線程如果從request隊(duì)列中,讀取不到request,則會(huì)阻塞pool_timeout,如果仍沒request則直接返回;

其中,成員變量:

self._requests_queue:  任務(wù)隊(duì)列,通過threadpool.makeReuests(args)創(chuàng)建的任務(wù)都會(huì)放到此隊(duì)列中;
self._results_queue:  字典,任務(wù)對(duì)應(yīng)的任務(wù)執(zhí)行 
self.workers:  工作線程list,通過self.createWorkers()函數(shù)內(nèi)創(chuàng)建的工作線程會(huì)放到此工作線程list中;
self.dismisssedWorkers:  被設(shè)置線程事件,并且沒有被join的工作線程
self.workRequests:  字典,記錄推送到線程池的任務(wù),結(jié)構(gòu)為requestID:request。其中requestID是任務(wù)的唯一標(biāo)識(shí),會(huì)在后面作介紹。

2、工作線程的啟動(dòng)(self.createWorks(args))

函數(shù)定義:

def createWorkers(self, num_workers, poll_timeout=5): 
  """Add num_workers worker threads to the pool. 
 
  ``poll_timout`` sets the interval in seconds (int or float) for how 
  ofte threads should check whether they are dismissed, while waiting for 
  requests. 
 
  """ 
  for i in range(num_workers): 
   self.workers.append(WorkerThread(self._requests_queue, 
    self._results_queue, poll_timeout=poll_timeout)) 

其中WorkerThread()繼承自thread,即python內(nèi)置的線程類,將創(chuàng)建的WorkerThread對(duì)象放入到self.workers隊(duì)列中。下面看一下WorkerThread類的定義:
從self.__init__(args)可看出:

class WorkerThread(threading.Thread): 
 """Background thread connected to the requests/results queues. 
 
 A worker thread sits in the background and picks up work requests from 
 one queue and puts the results in another until it is dismissed. 
 
 """ 
 
 def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): 
  """Set up thread in daemonic mode and start it immediatedly. 
 
  ``requests_queue`` and ``results_queue`` are instances of 
  ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a 
  new worker thread. 
 
  """ 
  threading.Thread.__init__(self, **kwds) 
  self.setDaemon(1)# 
  self._requests_queue = requests_queue#任務(wù)隊(duì)列 
  self._results_queue = results_queue#任務(wù)結(jié)果隊(duì)列 
  self._poll_timeout = poll_timeout#run函數(shù)中從任務(wù)隊(duì)列中g(shù)et任務(wù)時(shí)的超時(shí)時(shí)間,如果超時(shí)則繼續(xù)while(true); 
  self._dismissed = threading.Event()#線程事件,如果set線程事件則run會(huì)執(zhí)行break,直接退出工作線程; 
  self.start() 
 
 def run(self): 
  """Repeatedly process the job queue until told to exit.""" 
  while True: 
   if self._dismissed.isSet():#如果設(shè)置了self._dismissed則退出工作線程 
 
    # we are dismissed, break out of loop 
    break 
   # get next work request. If we don't get a new request from the 
   # queue after self._poll_timout seconds, we jump to the start of 
   # the while loop again, to give the thread a chance to exit. 
   try: 
    request = self._requests_queue.get(True, self._poll_timeout) 
   except Queue.Empty:#嘗從任務(wù) 隊(duì)列self._requests_queue 中g(shù)et任務(wù),如果隊(duì)列為空,則continue 
    continue 
   else: 
    if self._dismissed.isSet():#檢測(cè)此工作線程事件是否被set,如果被設(shè)置,意味著要結(jié)束此工作線程,那么就需要將取到的任務(wù)返回到任務(wù)隊(duì)列中,并且退出線程 
     # we are dismissed, put back request in queue and exit loop 
     self._requests_queue.put(request) 
     break 
    try:<span style="color:#如果線程事件沒有被設(shè)置,那么執(zhí)行任務(wù)處理函數(shù)request.callable,并將返回的result,壓入到任務(wù)結(jié)果隊(duì)列中 
     result = request.callable(*request.args, **request.kwds) 
     self._results_queue.put((request, result)) 
    except: 
     request.exception = True 
     self._results_queue.put((request, sys.exc_info()))#如果任務(wù)處理函數(shù)出現(xiàn)異常,則將異常壓入到隊(duì)列中 
 
 def dismiss(self):</span> 
  """Sets a flag to tell the thread to exit when done with current job. 
  """ 
  self._dismissed.set() 

初始化中變量:

self._request_queue:任務(wù)隊(duì)列;
self._resutls_queuqe,:任務(wù)結(jié)果隊(duì)列 ;
self._pool_timeout:run函數(shù)中從任務(wù)隊(duì)列中g(shù)et任務(wù)時(shí)的超時(shí)時(shí)間,如果超時(shí)則繼續(xù)while(true);
self._dismissed:線程事件,如果set線程事件則run會(huì)執(zhí)行break,直接退出工作線程;

最后調(diào)用self.start()啟動(dòng)線程,run函數(shù)定義見上面:

從上面run函數(shù)while執(zhí)行步驟如下:
(1)如果設(shè)置了self._dismissed則退出工作線程,否則執(zhí)行第2步
(2)嘗從任務(wù) 隊(duì)列self._requests_queue 中g(shù)et任務(wù),如果隊(duì)列為空,則continue 執(zhí)行下一次while循環(huán),否則執(zhí)行第3步
(3)檢測(cè)此工作線程事件是否被set,如果被設(shè)置,意味著要結(jié)束此工作線程,那么就需要將取到的任務(wù)返回到任務(wù)隊(duì)列中,并且退出線程。如果線程事件沒有被設(shè)置,那么執(zhí)行任務(wù)處理函數(shù)request.callable,并將返回的result,壓入到任務(wù)結(jié)果隊(duì)列中,如果任務(wù)處理函數(shù)出現(xiàn)異常,則將異常壓入到隊(duì)列中。最后跳轉(zhuǎn)第4步
(4)繼續(xù)循環(huán),返回1

到此工作線程創(chuàng)建完畢,根據(jù)設(shè)置的線程池線程數(shù)量,創(chuàng)建工作線程,工作線程從任務(wù)隊(duì)列中g(shù)et任務(wù),進(jìn)行任務(wù)處理,并將任務(wù)處理結(jié)果壓入到任務(wù)結(jié)果隊(duì)列中。

3、任務(wù)的創(chuàng)建(makeRequests)

任務(wù)的創(chuàng)建函數(shù)為threadpool.makeRequests(callable_,args_list,callback=None):

# utility functions 
def makeRequests(callable_, args_list, callback=None, 
  exc_callback=_handle_thread_exception): 
 """Create several work requests for same callable with different arguments. 
 
 Convenience function for creating several work requests for the same 
 callable where each invocation of the callable receives different values 
 for its arguments. 
 
 ``args_list`` contains the parameters for each invocation of callable. 
 Each item in ``args_list`` should be either a 2-item tuple of the list of 
 positional arguments and a dictionary of keyword arguments or a single, 
 non-tuple argument. 
 
 See docstring for ``WorkRequest`` for info on ``callback`` and 
 ``exc_callback``. 
 
 """ 
 requests = [] 
 for item in args_list: 
  if isinstance(item, tuple): 
   requests.append( 
    WorkRequest(callable_, item[0], item[1], callback=callback, 
     exc_callback=exc_callback) 
   ) 
  else: 
   requests.append( 
    WorkRequest(callable_, [item], None, callback=callback, 
     exc_callback=exc_callback) 
   ) 
 return requests 

其中創(chuàng)建任務(wù)的函數(shù)參數(shù)具體意義為下:

callable_:注冊(cè)的任務(wù)處理函數(shù),當(dāng)任務(wù)被放到任務(wù)隊(duì)列后,工作線程中獲取到該任務(wù)的線程,會(huì)執(zhí)行此 callable_

args_list:首先args_list是列表,列表元素類型為元組,元組中有兩個(gè)元素item[0],item[1],item[0]為位置參 數(shù),item[1]為字典類型關(guān)鍵字參數(shù)。列表中元組的個(gè)數(shù),代表啟動(dòng)的任務(wù)個(gè)數(shù),在使用的時(shí)候一般都為單個(gè)元組,即一個(gè)makerequest()創(chuàng)建一個(gè)任務(wù)。

callback:回調(diào)函數(shù),在poll函數(shù)中調(diào)用(后面講解此函數(shù)),callable_調(diào)用結(jié)束后,會(huì)就任務(wù)結(jié)果放入到任務(wù)結(jié)果隊(duì)列中(self._resutls_queue),在poll函數(shù)中,當(dāng)從self._resutls_queue隊(duì)列中g(shù)et某個(gè)結(jié)果后,會(huì)執(zhí)行此callback(request,result),其中result是request任務(wù)返回的結(jié)果。

 exc_callback:異?;卣{(diào)函數(shù),在poll函數(shù)中,如果某個(gè)request對(duì)應(yīng)有執(zhí)行異常,那么會(huì)調(diào)用此異?;卣{(diào)。

創(chuàng)建完成任務(wù)后,返回創(chuàng)建的任務(wù)。

外層記錄此任務(wù),放入到任務(wù)列表中。

上面是創(chuàng)建任務(wù)的函數(shù),下面講解任務(wù)對(duì)象的結(jié)構(gòu):

class WorkRequest: 
 """A request to execute a callable for putting in the request queue later. 
 
 See the module function ``makeRequests`` for the common case 
 where you want to build several ``WorkRequest`` objects for the same 
 callable but with different arguments for each call. 
 
 """ 
 
 def __init__(self, callable_, args=None, kwds=None, requestID=None, 
   callback=None, exc_callback=_handle_thread_exception): 
  """Create a work request for a callable and attach callbacks. 
 
  A work request consists of the a callable to be executed by a 
  worker thread, a list of positional arguments, a dictionary 
  of keyword arguments. 
 
  A ``callback`` function can be specified, that is called when the 
  results of the request are picked up from the result queue. It must 
  accept two anonymous arguments, the ``WorkRequest`` object and the 
  results of the callable, in that order. If you want to pass additional 
  information to the callback, just stick it on the request object. 
 
  You can also give custom callback for when an exception occurs with 
  the ``exc_callback`` keyword parameter. It should also accept two 
  anonymous arguments, the ``WorkRequest`` and a tuple with the exception 
  details as returned by ``sys.exc_info()``. The default implementation 
  of this callback just prints the exception info via 
  ``traceback.print_exception``. If you want no exception handler 
  callback, just pass in ``None``. 
 
  ``requestID``, if given, must be hashable since it is used by 
  ``ThreadPool`` object to store the results of that work request in a 
  dictionary. It defaults to the return value of ``id(self)``. 
 
  """ 
  if requestID is None: 
   self.requestID = id(self) 
  else: 
   try: 
    self.requestID = hash(requestID) 
   except TypeError: 
    raise TypeError("requestID must be hashable.") 
  self.exception = False 
  self.callback = callback 
  self.exc_callback = exc_callback 
  self.callable = callable_ 
  self.args = args or [] 
  self.kwds = kwds or {} 
 
 def __str__(self): 
  return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ 
   (self.requestID, self.args, self.kwds, self.exception) 

上面self.callback 以及self.exc_callback,和self.callable_ ,args,dwds都已經(jīng)講解,就不在啰嗦了。
其中有一個(gè)任務(wù)的全局唯一標(biāo)識(shí),即self.requestID,通過獲取自身內(nèi)存首地址作為自己的唯一標(biāo)識(shí)id(self)
self.exception 初始化為False,如果執(zhí)行self.callable()過程中出現(xiàn)異常,那么此變量會(huì)標(biāo)設(shè)置為True。

至此,任務(wù)創(chuàng)建完畢,調(diào)用makeRequests()的上層記錄有任務(wù)列表request_list.

4、任務(wù)的推送到線程池(putRequest)

上面小節(jié)中介紹了任務(wù)的創(chuàng)建,任務(wù)的個(gè)數(shù)可以成千上百,但是處理任務(wù)的線程數(shù)量只有我們?cè)趧?chuàng)建線程池的時(shí)候制定的線程數(shù)量來處理,指定的線程數(shù)量往往比任務(wù)的數(shù)量小得多,因此,每個(gè)線程必須處理多個(gè)任務(wù)。

本節(jié)介紹如何將創(chuàng)建的任務(wù)推送的線程池中,以讓線程池由阻塞狀態(tài),獲取任務(wù),然后去處理任務(wù)。
任務(wù)的推送使用ThreadPool線程池類中的putRequest(self,request,block,timeout)來創(chuàng)建:

def putRequest(self, request, block=True, timeout=None): 
 """Put work request into work queue and save its id for later.""" 
 assert isinstance(request, WorkRequest) 
 # don't reuse old work requests 
 assert not getattr(request, 'exception', None) 
 self._requests_queue.put(request, block, timeout) 
 self.workRequests[request.requestID] = request 

函數(shù)的主要作用就是將request任務(wù),也就是上一小節(jié)中創(chuàng)建的任務(wù),put到線程池的任務(wù)隊(duì)列中(self._request_queue)。然后記錄已經(jīng)推送到線程池的任務(wù),通過線程池的self.workReuests 字典來存儲(chǔ),結(jié)構(gòu)為request.requestID:request。

至此,任務(wù)創(chuàng)建完成,并且已經(jīng)將任務(wù)推送到線程池中。

5、線程處理任務(wù)

通過上一小節(jié),任務(wù)已經(jīng)推送到了線程中。在任務(wù)沒有被推送到線程池中時(shí),線程池中的線程都處在處在阻塞狀態(tài)中,即在線程的self.run()函數(shù)中,一直處于一下狀態(tài):

try: 
 request = self._requests_queue.get(True, self._poll_timeout) 
except Queue.Empty:#嘗從任務(wù) 隊(duì)列self._requests_queue 中g(shù)et任務(wù),如果隊(duì)列為空,則continue 
 continue 

 現(xiàn)在任務(wù)已經(jīng)推送到線程池中,那么get任務(wù)將會(huì)正常返回,會(huì)執(zhí)行下面的步驟:

def run(self): 
  """Repeatedly process the job queue until told to exit.""" 
  while True: 
   if self._dismissed.isSet():#如果設(shè)置了self._dismissed則退出工作線程 
 
    # we are dismissed, break out of loop 
    break 
   # get next work request. If we don't get a new request from the 
   # queue after self._poll_timout seconds, we jump to the start of 
   # the while loop again, to give the thread a chance to exit. 
   try: 
    request = self._requests_queue.get(True, self._poll_timeout) 
   except Queue.Empty:#嘗從任務(wù) 隊(duì)列self._requests_queue 中g(shù)et任務(wù),如果隊(duì)列為空,則continue 
    continue 
   else: 
    if self._dismissed.isSet():#檢測(cè)此工作線程事件是否被set,如果被設(shè)置,意味著要結(jié)束此工作線程,那么就需要將取到的任務(wù)返回到任務(wù)隊(duì)列中,并且退出線程 
     # we are dismissed, put back request in queue and exit loop 
     self._requests_queue.put(request) 
     break 
    try:#如果線程事件沒有被設(shè)置,那么執(zhí)行任務(wù)處理函數(shù)request.callable,并將返回的result,壓入到任務(wù)結(jié)果隊(duì)列中 
     result = request.callable(*request.args, **request.kwds) 
     self._results_queue.put((request, result)) 
    except: 
     request.exception = True 
     self._results_queue.put((request, sys.exc_info()))#如果任務(wù)處理函數(shù)出現(xiàn)異常,則將異常壓入到隊(duì)列中 

獲取任務(wù)--->調(diào)用任務(wù)的處理函數(shù)callable()處理任務(wù)--->將任務(wù)request以及任務(wù)返回的結(jié)果壓入到self.results_queue隊(duì)列中---->如果任務(wù)處理函數(shù)異常,那么將任務(wù)異常標(biāo)識(shí)設(shè)置為True,并將任務(wù)request以及任務(wù)異常壓入到self.results_queue隊(duì)列中---->再次返回獲取任務(wù)

如果,在while循環(huán)過程中,外部設(shè)置了線程事件,即self._dismissed.isSet為True,那么意味著此線程將會(huì)結(jié)束處理任務(wù),那么會(huì)將get到的任務(wù)返回的任務(wù)隊(duì)列中,并且退出線程。

6、任務(wù)結(jié)束處理

上面小節(jié)中,介紹了線程池不斷的get任務(wù),并且不斷的處理任務(wù)。那么每個(gè)任務(wù)結(jié)束之后我們?cè)撛趺刺幚砟兀€程池提供了wait()以及poll()函數(shù)。

當(dāng)我們把任務(wù)提交個(gè)線程池之后,我們會(huì)調(diào)用wait()來等待任務(wù)處理結(jié)束,結(jié)束后wait()將會(huì)返回,返回后我們可以進(jìn)行下一步操作,例如重新創(chuàng)建任務(wù),將任務(wù)繼續(xù)推送到線程池中,或者結(jié)束線程池。結(jié)束線程池會(huì)在下一小節(jié)介紹,這一小節(jié)主要介紹wait()和poll()操作。

先來看看wait()操作:

def wait(self): 
 """Wait for results, blocking until all have arrived.""" 
 while 1: 
  try: 
   self.poll(True) 
  except NoResultsPending: 
   break 

等待任務(wù)處理結(jié)束,在所有任務(wù)處理完成之前一直處于block階段,如果self.poll()返回異常NoResultsPending異常,然后wait返回,任務(wù)處理結(jié)束。
下面看看poll函數(shù):

def poll(self, block=False): 
 """Process any new results in the queue.""" 
 while True: 
  # still results pending? 
  if not self.workRequests: 
   raise NoResultsPending 
  # are there still workers to process remaining requests? 
  elif block and not self.workers: 
   raise NoWorkersAvailable 
  try: 
   # get back next results 
   request, result = self._results_queue.get(block=block) 
   # has an exception occured? 
   if request.exception and request.exc_callback: 
    request.exc_callback(request, result) 
   # hand results to callback, if any 
   if request.callback and not \ 
     (request.exception and request.exc_callback): 
    request.callback(request, result) 
   del self.workRequests[request.requestID] 
  except Queue.Empty: 
   break 

(1)首先,檢測(cè)任務(wù)字典({request.requestID:request})是否為空,如果為空則拋出異常NoResultPending結(jié)束,否則到第2步;

(2)檢測(cè)工作線程是否為空(如果某個(gè)線程的線程事件被設(shè)置,那么工作線程退出,并從self.workers中pop出),如果為空則拋出NoWorkerAvailable異常結(jié)束,否則進(jìn)入第3步;

(3)從任務(wù)結(jié)果隊(duì)列中g(shù)et任務(wù)結(jié)果,如果拋出隊(duì)列為空,那么break,返回,否則進(jìn)入第4步;

(4)如果任務(wù)處理過程中出現(xiàn)異常,即設(shè)置了request.exception,并且設(shè)置了異常處理回調(diào)即request.exc_callback則執(zhí)行異?;卣{(diào),再回調(diào)中處理異常,返回后將任務(wù)從任務(wù)列表self.workRequests中移除,繼續(xù)get任務(wù),返回第1步。否則進(jìn)入第5步;

(5)如果設(shè)置了任務(wù)結(jié)果回調(diào)即request.callback不為空,則執(zhí)行任務(wù)結(jié)果回調(diào)即request.callbacl(request,result),并
將任務(wù)從任務(wù)列表self.workRequests中移除,繼續(xù)get任務(wù),返回第1步。

(6)重復(fù)進(jìn)行上面的步驟直到拋出異常,或者任務(wù)隊(duì)列為空,則poll返會(huì);

至此拋出NoResultPending wait操作接受此異常后,至此wait()返回。

7、工作線程的退出

threadpool提供的工作線程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:

def dismissWorkers(self, num_workers, do_join=False): 
 """Tell num_workers worker threads to quit after their current task.""" 
 dismiss_list = [] 
 for i in range(min(num_workers, len(self.workers))): 
  worker = self.workers.pop() 
  worker.dismiss() 
  dismiss_list.append(worker) 
 
 if do_join: 
  for worker in dismiss_list: 
   worker.join() 
 else: 
  self.dismissedWorkers.extend(dismiss_list) 
 
def joinAllDismissedWorkers(self): 
 """Perform Thread.join() on all worker threads that have been dismissed. 
 """ 
 for worker in self.dismissedWorkers: 
  worker.join() 
 self.dismissedWorkers = [] 

從dismissWorkers可看出,主要工作是從self.workers 工作線程中pop出指定的線程數(shù)量,并且設(shè)置此線程的線程事件,設(shè)置線程事件后,此線程self.run()函數(shù),則會(huì)檢測(cè)到此設(shè)置,并結(jié)束線程。

如果設(shè)置了在do_join,即設(shè)置了在此函數(shù)中join退出的線程,那么對(duì)退出的線程執(zhí)行join操作。否則將pop出的線程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去處理join線程。

8、總結(jié)

到此為止,threadpool線程池中所有的操作介紹完畢,其實(shí)現(xiàn)也做了具體的介紹。從上面可看出,線程池并沒有那么復(fù)雜,只有幾個(gè)簡(jiǎn)單的操作,主要是了解整個(gè)處理流程即可。

希望大家多多提出建議和意見。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 淺談pycharm下找不到sqlalchemy的問題

    淺談pycharm下找不到sqlalchemy的問題

    今天小編就為大家分享一篇淺談pycharm下找不到sqlalchemy的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2018-12-12
  • python利用tkinter實(shí)現(xiàn)圖片格式轉(zhuǎn)換的示例

    python利用tkinter實(shí)現(xiàn)圖片格式轉(zhuǎn)換的示例

    這篇文章主要介紹了python利用tkinter實(shí)現(xiàn)圖片格式轉(zhuǎn)換,幫助大家更好的理解和使用python,感興趣的朋友可以了解下
    2020-09-09
  • Python使用BeautifulSoup進(jìn)行頁(yè)面解析

    Python使用BeautifulSoup進(jìn)行頁(yè)面解析

    在Python中,我們可以使用BeautifulSoup庫(kù)來解析網(wǎng)頁(yè),BeautifulSoup提供了簡(jiǎn)單而強(qiáng)大的API,使得解析網(wǎng)頁(yè)變得輕松而高效,下面小編就來為大家詳細(xì)講講BeautifulSoup解析網(wǎng)頁(yè)的具體操作吧
    2023-09-09
  • 使用numpy轉(zhuǎn)換成cupy利用GPU執(zhí)行錯(cuò)誤

    使用numpy轉(zhuǎn)換成cupy利用GPU執(zhí)行錯(cuò)誤

    在使用PyInstaller打包Python程序時(shí),可能會(huì)遇到缺少模塊的錯(cuò)誤,尤其是在將Numpy轉(zhuǎn)換為CuPy以利用GPU加速時(shí),如果遇到ModuleNotFoundError,表明PyInstaller沒有包含一些隱式導(dǎo)入的包,解決方法是手動(dòng)將缺失的包添加到打包目錄中
    2024-09-09
  • 在Mac下使用python實(shí)現(xiàn)簡(jiǎn)單的目錄樹展示方法

    在Mac下使用python實(shí)現(xiàn)簡(jiǎn)單的目錄樹展示方法

    今天小編就為大家分享一篇在Mac下使用python實(shí)現(xiàn)簡(jiǎn)單的目錄樹展示方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2018-11-11
  • python與mysql數(shù)據(jù)庫(kù)交互的實(shí)現(xiàn)

    python與mysql數(shù)據(jù)庫(kù)交互的實(shí)現(xiàn)

    這篇文章主要介紹了python與mysql數(shù)據(jù)庫(kù)交互的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-01-01
  • python爬蟲之爬取百度音樂的實(shí)現(xiàn)方法

    python爬蟲之爬取百度音樂的實(shí)現(xiàn)方法

    今天小編就為大家分享一篇python爬蟲之爬取百度音樂的實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-08-08
  • django框架之cookie/session的使用示例(小結(jié))

    django框架之cookie/session的使用示例(小結(jié))

    這篇文章主要介紹了django框架之cookie/session的使用示例(小結(jié)),詳細(xì)的介紹了cookie和session技術(shù)的接口獲取等問題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-10-10
  • xshell會(huì)話批量遷移到mobaxterm的工具(python小工具)

    xshell會(huì)話批量遷移到mobaxterm的工具(python小工具)

    這篇文章主要介紹了xshell會(huì)話批量遷移到mobaxterm的工具,使用方法也超級(jí)簡(jiǎn)單,本文通過python代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-12-12
  • python垃圾回收機(jī)制(GC)原理解析

    python垃圾回收機(jī)制(GC)原理解析

    這篇文章主要介紹了python垃圾回收機(jī)制(GC)原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12

最新評(píng)論