詳解Python開啟線程和線程池的方法
Python開啟線程和線程池的方法
一.最佳線程數(shù)的獲取:
1、通過用戶慢慢遞增來進行性能壓測,觀察QPS(即每秒的響應請求數(shù),也即是最大吞吐能力。),響應時間
2、根據(jù)公式計算:服務器端最佳線程數(shù)量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數(shù)量
3、單用戶壓測,查看CPU的消耗,然后直接乘以百分比,再進行壓測,一般這個值的附近應該就是最佳線程數(shù)量。
二、為什么要使用線程池?
1.多線程中,線程的數(shù)量并非越多越好
2.節(jié)省每次開啟線程的開銷
三、如何實現(xiàn)線程池?
threadpool模塊
concurrent.futures
重寫threadpool或者future的函數(shù)
vthread 模塊
1、過去:
使用threadpool模塊,這是個python的第三方模塊,支持python2和python3,具體使用方式如下:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import threadpool
import time
def sayhello (a):
print("hello: "+a)
time.sleep(2)
def main():
global result
seed=["a","b","c"]
start=time.time()
task_pool=threadpool.ThreadPool(5)
requests=threadpool.makeRequests(sayhello,seed)
for req in requests:
task_pool.putRequest(req)
task_pool.wait()
end=time.time()
time_m = end-start
print("time: "+str(time_m))
start1=time.time()
for each in seed:
sayhello(each)
end1=time.time()
print("time1: "+str(end1-start1))
if __name__ == '__main__':
main()運行結果如下:

threadpool是一個比較老的模塊了,現(xiàn)在雖然還有一些人在用,但已經(jīng)不再是主流了,關于python多線程,現(xiàn)在已經(jīng)開始步入未來(future模塊)了
2、未來:
使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,但是,python2.7以上版本也可以安裝使用,具體使用方式如下:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time
def sayhello(a):
print("hello: "+a)
time.sleep(2)
def main():
seed=["a","b","c"]
start1=time.time()
for each in seed:
sayhello(each)
end1=time.time()
print("time1: "+str(end1-start1))
start2=time.time()
with ThreadPoolExecutor(3) as executor:
for each in seed:
executor.submit(sayhello,each)
end2=time.time()
print("time2: "+str(end2-start2))
start3=time.time()
with ThreadPoolExecutor(3) as executor1:
executor1.map(sayhello,seed)
end3=time.time()
print("time3: "+str(end3-start3))
if __name__ == '__main__':
main()運行結果如下:

注意到一點:
concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數(shù),另一種是map()函數(shù),兩者的主要區(qū)別在于:
2.1、map可以保證輸出的順序, submit輸出的順序是亂的
2.2、如果你要提交的任務的函數(shù)是一樣的,就可以簡化成map。但是假如提交的任務函數(shù)是不一樣的,或者執(zhí)行的過程之可能出現(xiàn)異常(使用map執(zhí)行過程中發(fā)現(xiàn)問題會直接拋出錯誤)就要用到submit()
2.3、submit和map的參數(shù)是不同的,submit每次都需要提交一個目標函數(shù)和對應的參數(shù),map只需要提交一次目標函數(shù),目標函數(shù)的參數(shù)放在一個迭代器(列表,字典)里就可以。
3.現(xiàn)在?
這里要考慮一個問題,以上兩種線程池的實現(xiàn)都是封裝好的,任務只能在線程池初始化的時候添加一次,那么,假設我現(xiàn)在有這樣一個需求,需要在線程池運行時,再往里面添加新的任務(注意,是新任務,不是新線程),那么要怎么辦?
其實有兩種方式:
3.1、重寫threadpool或者future的函數(shù):
這個方法需要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實現(xiàn)機制才能正確的根據(jù)自己的需要重寫其中的方法。
3.2、自己構建一個線程池:
這個方法就需要對線程池的有一個清晰的了解了,附上我自己構建的一個線程池:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流群:711312441
import threading
import Queue
import hashlib
import logging
from utils.progress import PrintProgress
from utils.save import SaveToSqlite
class ThreadPool(object):
def __init__(self, thread_num, args):
self.args = args
self.work_queue = Queue.Queue()
self.save_queue = Queue.Queue()
self.threads = []
self.running = 0
self.failure = 0
self.success = 0
self.tasks = {}
self.thread_name = threading.current_thread().getName()
self.__init_thread_pool(thread_num)
# 線程池初始化
def __init_thread_pool(self, thread_num):
# 下載線程
for i in range(thread_num):
self.threads.append(WorkThread(self))
# 打印進度信息線程
self.threads.append(PrintProgress(self))
# 保存線程
self.threads.append(SaveToSqlite(self, self.args.dbfile))
# 添加下載任務
def add_task(self, func, url, deep):
# 記錄任務,判斷是否已經(jīng)下載過
url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
if not url_hash in self.tasks:
self.tasks[url_hash] = url
self.work_queue.put((func, url, deep))
logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))
# 獲取下載任務
def get_task(self):
# 從隊列里取元素,如果block=True,則一直阻塞到有可用元素為止。
task = self.work_queue.get(block=False)
return task
def task_done(self):
# 表示隊列中的某個元素已經(jīng)執(zhí)行完畢。
self.work_queue.task_done()
# 開始任務
def start_task(self):
for item in self.threads:
item.start()
logging.debug("Work start")
def increase_success(self):
self.success += 1
def increase_failure(self):
self.failure += 1
def increase_running(self):
self.running += 1
def decrease_running(self):
self.running -= 1
def get_running(self):
return self.running
# 打印執(zhí)行信息
def get_progress_info(self):
progress_info = {}
progress_info['work_queue_number'] = self.work_queue.qsize()
progress_info['tasks_number'] = len(self.tasks)
progress_info['save_queue_number'] = self.save_queue.qsize()
progress_info['success'] = self.success
progress_info['failure'] = self.failure
return progress_info
def add_save_task(self, url, html):
self.save_queue.put((url, html))
def get_save_task(self):
save_task = self.save_queue.get(block=False)
return save_task
def wait_all_complete(self):
for item in self.threads:
if item.isAlive():
# join函數(shù)的意義,只有當前執(zhí)行join函數(shù)的線程結束,程序才能接著執(zhí)行下去
item.join()
# WorkThread 繼承自threading.Thread
class WorkThread(threading.Thread):
# 這里的thread_pool就是上面的ThreadPool類
def __init__(self, thread_pool):
threading.Thread.__init__(self)
self.thread_pool = thread_pool
#定義線程功能方法,即,當thread_1,...,thread_n,調用start()之后,執(zhí)行的操作。
def run(self):
print (threading.current_thread().getName())
while True:
try:
# get_task()獲取從工作隊列里獲取當前正在下載的線程,格式為func,url,deep
do, url, deep = self.thread_pool.get_task()
self.thread_pool.increase_running()
# 判斷deep,是否獲取新的鏈接
flag_get_new_link = True
if deep >= self.thread_pool.args.deep:
flag_get_new_link = False
#學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流群:711312441
# 此處do為工作隊列傳過來的func,返回值為一個頁面內容和這個頁面上所有的新鏈接
html, new_link = do(url, self.thread_pool.args, flag_get_new_link)
if html == '':
self.thread_pool.increase_failure()
else:
self.thread_pool.increase_success()
# html添加到待保存隊列
self.thread_pool.add_save_task(url, html)
# 添加新任務,即,將新頁面上的不重復的鏈接加入工作隊列。
if new_link:
for url in new_link:
self.thread_pool.add_task(do, url, deep + 1)
self.thread_pool.decrease_running()
# self.thread_pool.task_done()
except Queue.Empty:
if self.thread_pool.get_running() <= 0:
break
except Exception, e:
self.thread_pool.decrease_running()
# print str(e)
break安裝vthread函數(shù)庫系統(tǒng)命令行下執(zhí)行:
pip install vthread
一句話實現(xiàn)簡單多線程
import vthread,requests
@vthread.thread(5) #開5個線程執(zhí)行同一個函數(shù)
def compete(url):
r = requests.get(url)
if r.status_code == 200 :
print("[*]Success")
else:
print("[*]Fail. Retrying...")
compete("http://www.baidu.com/")相同效果:
import vthread,requests
@vthread.thread
def compete(url):
r = requests.get(url)
if r.status_code == 200 :
print("[*]Success")
else:
print("[*]Fail. Retrying...")
for i in range(5): #線程數(shù)
compete("http://www.baidu.com/")線程池包裝
import vthread,requests
@vthread.pool(10) #包裝10條線程池
def compete(url):
r = requests.get(url)
if r.status_code == 200 :
print("[*]Success")
else:
print("[*]Fail. Retrying...")
for i in range(20): #20線程
compete("http://www.baidu.com/")到此這篇關于Python開啟線程和線程池的方法的文章就介紹到這了,更多相關Python開啟線程和線程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python 采用paramiko 遠程執(zhí)行命令及報錯解決
這篇文章主要介紹了python 采用paramiko 遠程執(zhí)行命令及報錯解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-10-10
python GUI庫圖形界面開發(fā)之PyQt5控件數(shù)據(jù)拖曳Drag與Drop詳細使用方法與實例
這篇文章主要介紹了python GUI庫圖形界面開發(fā)之PyQt5控件數(shù)據(jù)拖曳Drag與Drop詳細使用方法與實例,需要的朋友可以參考下2020-02-02
Django多數(shù)據(jù)庫配置及逆向生成model教程
這篇文章主要介紹了Django多數(shù)據(jù)庫配置及逆向生成model教程,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03
Python利用wxPython模塊打造ChatGPT式打字效果程序
這篇文章主要為大家介紹了如何利用Python和wxPython模塊打造一個ChatGPT式打字效果程序,從而增強用戶體驗或提高應用程序的可讀性,感興趣的可以了解一下2023-05-05
OpenCV-Python使用分水嶺算法實現(xiàn)圖像的分割與提取
在圖像的處理過程中,經(jīng)常需要從圖像中將前景對象作為目標圖像分割或者提取出來。本文就介紹了使用分水嶺算法實現(xiàn)圖像的分割與提取,感興趣的可以了解一下2021-06-06

