Python多線程中線程數(shù)量如何控制
前言
前段時間學(xué)習(xí)了python的多線程爬蟲,當(dāng)時爬取一個圖片網(wǎng)站,開啟多線程后,并沒有限制線程的數(shù)量,也就是說,如果下載1000張圖片,會一次性開啟1000個子線程同時進(jìn)行下載
現(xiàn)在希望控制線程數(shù)量:例如每次只下載5張,當(dāng)下載完成后再下載另外5張,直至全部完成
查了一些資料,發(fā)現(xiàn)在python中,threading 模塊有提供 Semaphore類 和 BoundedSemaphore類來限制線程數(shù)
官網(wǎng)給出例子如下:
信號量通常用于保護(hù)容量有限的資源,例如數(shù)據(jù)庫服務(wù)器。在資源大小固定的任何情況下,都應(yīng)使用有界信號量。在產(chǎn)生任何工作線程之前,您的主線程將初始化信號量:
maxconnections = 5 # ... pool_sema = BoundedSemaphore(value=maxconnections)
產(chǎn)生后,工作線程在需要連接到服務(wù)器時會調(diào)用信號量的獲取和釋放方法:
改造之前的多線程爬蟲
首先貼出原來的代碼
# -*- coding:utf-8 -*- import requests from requests.exceptions import RequestException import os, time import re from lxml import etree import threading lock = threading.Lock() def get_html(url): ? ? """ ? ? 定義一個方法,用于獲取一個url頁面的響應(yīng)內(nèi)容 ? ? :param url: 要訪問的url ? ? :return: 響應(yīng)內(nèi)容 ? ? """ ? ? response = requests.get(url, timeout=10) ? ? # print(response.status_code) ? ? try: ? ? ? ? if response.status_code == 200: ? ? ? ? ? ? # print(response.text) ? ? ? ? ? ? return response.text ? ? ? ? else: ? ? ? ? ? ? ?return None ? ? except RequestException: ? ? ? ? print("請求失敗") ? ? ? ? # return None def parse_html(html_text): ? ? """ ? ? 定義一個方法,用于解析頁面內(nèi)容,提取圖片url ? ? :param html_text: ? ? :return:一個頁面的圖片url集合 ? ? """ ? ? html = etree.HTML(html_text) ? ? if len(html) > 0: ? ? ? ? img_src = html.xpath("http://img[@class='photothumb lazy']/@data-original") ?# 元素提取方法 ? ? ? ? # print(img_src) ? ? ? ? return img_src ? ? else: ? ? ? ? print("解析頁面元素失敗") def get_image_pages(url): ? ? """ ? ? 獲取所查詢圖片結(jié)果的所有頁碼 ? ? :param url: 查詢圖片url ? ? :return: 總頁碼數(shù) ? ? """ ? ? html_text = get_html(url) ?# 獲取搜索url響應(yīng)內(nèi)容 ? ? # print(html_text) ? ? if html_text is not None: ? ? ? ? html = etree.HTML(html_text) ?# 生成XPath解析對象 ? ? ? ? last_page = html.xpath("http://div[@class='pages']//a[last()]/@href") ?# 提取最后一頁所在href鏈接 ? ? ? ? print(last_page) ? ? ? ? if last_page: ? ? ? ? ? ? max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group() ?# 使用正則表達(dá)式提取鏈接中的頁碼數(shù)字 ? ? ? ? ? ? print(max_page) ? ? ? ? ? ? print(type(max_page)) ? ? ? ? ? ? return int(max_page) ?# 將字符串頁碼轉(zhuǎn)為整數(shù)并返回 ? ? ? ? else: ? ? ? ? ? ? print("暫無數(shù)據(jù)") ? ? ? ? ? ? return None ? ? else: ? ? ? ? print("查詢結(jié)果失敗") def get_all_image_url(page_number): ? ? """ ? ? 獲取所有圖片的下載url ? ? :param page_number: 爬取頁碼 ? ? :return: 所有圖片url的集合 ? ? """ ? ? base_url = 'https://imgbin.com/free-png/naruto/' ? ? image_urls = [] ? ? x = 1 ?# 定義一個標(biāo)識,用于給每個圖片url編號,從1遞增 ? ? for i in range(1, page_number): ? ? ? ? url = base_url + str(i) ?# 根據(jù)頁碼遍歷請求url ? ? ? ? try: ? ? ? ? ? ? html = get_html(url) ?# 解析每個頁面的內(nèi)容 ? ? ? ? ? ? if html: ? ? ? ? ? ? ? ? data = parse_html(html) ?# 提取頁面中的圖片url ? ? ? ? ? ? ? ? # print(data) ? ? ? ? ? ? ? ? # time.sleep(3) ? ? ? ? ? ? ? ? if data: ? ? ? ? ? ? ? ? ? ? for j in data: ? ? ? ? ? ? ? ? ? ? ? ? image_urls.append({ ? ? ? ? ? ? ? ? ? ? ? ? ? ? 'name': x, ? ? ? ? ? ? ? ? ? ? ? ? ? ? 'value': j ? ? ? ? ? ? ? ? ? ? ? ? }) ? ? ? ? ? ? ? ? ? ? ? ? x += 1 ?# 每提取一個圖片url,標(biāo)識x增加1 ? ? ? ? except RequestException as f: ? ? ? ? ? ? print("遇到錯誤:", f) ? ? ? ? ? ? continue ? ? # print(image_urls) ? ? return image_urls def get_image_content(url): ? ? """請求圖片url,返回二進(jìn)制內(nèi)容""" ? ? # print("正在下載", url) ? ? try: ? ? ? ? r = requests.get(url, timeout=15) ? ? ? ? if r.status_code == 200: ? ? ? ? ? ? return r.content ? ? ? ? return None ? ? except RequestException: ? ? ? ? return None def main(url, image_name): ? ? """ ? ? 主函數(shù):實(shí)現(xiàn)下載圖片功能 ? ? :param url: 圖片url ? ? :param image_name: 圖片名稱 ? ? :return: ? ? """ ? ? semaphore.acquire() ?# 加鎖,限制線程數(shù) ? ? print('當(dāng)前子線程: {}'.format(threading.current_thread().name)) ? ? save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' ? ? try: ? ? ? ? file_path = '{0}/{1}.jpg'.format(save_path, image_name) ? ? ? ? if not os.path.exists(file_path): ?# 判斷是否存在文件,不存在則爬取 ? ? ? ? ? ? with open(file_path, 'wb') as f: ? ? ? ? ? ? ? ? f.write(get_image_content(url)) ? ? ? ? ? ? ? ? f.close() ? ? ? ? ? ? ? ? print('第{}個文件保存成功'.format(image_name)) ? ? ? ? else: ? ? ? ? ? ? print("第{}個文件已存在".format(image_name)) ? ? ? ? semaphore.release() ?# 解鎖imgbin-多線程-重寫run方法.py ? ? except FileNotFoundError as f: ? ? ? ? print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) ? ? ? ? print("報錯:", f) ? ? ? ? raise ? ? except TypeError as e: ? ? ? ? print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) ? ? ? ? print("報錯:", e) class MyThread(threading.Thread): ? ? """繼承Thread類重寫run方法創(chuàng)建新進(jìn)程""" ? ? def __init__(self, func, args): ? ? ? ? """ ? ? ? ? :param func: run方法中要調(diào)用的函數(shù)名 ? ? ? ? :param args: func函數(shù)所需的參數(shù) ? ? ? ? """ ? ? ? ? threading.Thread.__init__(self) ? ? ? ? self.func = func ? ? ? ? self.args = args ? ? def run(self): ? ? ? ? print('當(dāng)前子線程: {}'.format(threading.current_thread().name)) ? ? ? ? self.func(self.args[0], self.args[1]) ? ? ? ? # 調(diào)用func函數(shù) ? ? ? ? # 因?yàn)檫@里的func函數(shù)其實(shí)是上述的main()函數(shù),它需要2個參數(shù);args傳入的是個參數(shù)元組,拆解開來傳入 if __name__ == '__main__': ? ? start = time.time() ? ? print('這是主線程:{}'.format(threading.current_thread().name)) ? ? urls = get_all_image_url(5) ?# 獲取所有圖片url列表 ? ? thread_list = [] ?# 定義一個列表,向里面追加線程 ? ? semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法 ? ? for t in urls: ? ? ? ? # print(i) ? ? ? ? m = MyThread(main, (t["value"], t["name"])) ?# 調(diào)用MyThread類,得到一個實(shí)例 ? ? ? ? thread_list.append(m) ? ? for m in thread_list: ? ? ? ? m.start() ?# 調(diào)用start()方法,開始執(zhí)行 ? ? for m in thread_list: ? ? ? ? m.join() ?# 子線程調(diào)用join()方法,使主線程等待子線程運(yùn)行完畢之后才退出 ? ? end = time.time() ? ? print(end-start) ? ? # get_image_pages("https://imgbin.com/free-png/Naruto")
將代碼進(jìn)行改造
1、下面的第8、9行表示調(diào)用 threading 的 BoundedSemaphore類,初始化信號量為5,把結(jié)果賦給變量 pool_sema
if __name__ == '__main__': ? ? start = time.time() ? ? print('這是主線程:{}'.format(threading.current_thread().name)) ? ? urls = get_all_image_url(5) ?# 獲取所有圖片url列表 ? ? thread_list = [] ?# 定義一個列表,向里面追加線程 ?? ?# 更多Python相關(guān)視頻、資料加群778463939免費(fèi)獲取 ? ? max_connections = 5 ?# 定義最大線程數(shù) ? ? pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法 ? ? for t in urls: ? ? ? ? # print(i) ? ? ? ? m = MyThread(main, (t["value"], t["name"])) ?# 調(diào)用MyThread類,得到一個實(shí)例 ? ? ? ? thread_list.append(m) ? ? for m in thread_list: ? ? ? ? m.start() ?# 調(diào)用start()方法,開始執(zhí)行 ? ? for m in thread_list: ? ? ? ? m.join() ?# 子線程調(diào)用join()方法,使主線程等待子線程運(yùn)行完畢之后才退出 ? ? end = time.time() ? ? print(end-start)
2、修改main()函數(shù)
(1)方法一:通過with語句實(shí)現(xiàn),第9行添加 with pool_sema
使用 with 語句來獲得一個鎖、條件變量或信號量,相當(dāng)于調(diào)用 acquire();離開 with 塊后,會自動調(diào)用 release()
def main(url, image_name): ? ? """ ? ? 主函數(shù):實(shí)現(xiàn)下載圖片功能 ? ? :param url: 圖片url ? ? :param image_name: 圖片名稱 ? ? :return: ? ? """ ? ? with pool_sema: ? ? ? ? print('當(dāng)前子線程: {}'.format(threading.current_thread().name)) ? ? ? ? save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' ? ? ? ? try: ? ? ? ? ? ? file_path = '{0}/{1}.jpg'.format(save_path, image_name) ? ? ? ? ? ? if not os.path.exists(file_path): ?# 判斷是否存在文件,不存在則爬取 ? ? ? ? ? ? ? ? with open(file_path, 'wb') as f: ? ? ? ? ? ? ? ? ? ? f.write(get_image_content(url)) ? ? ? ? ? ? ? ? ? ? f.close() ? ? ? ? ? ? ? ? ? ? print('第{}個文件保存成功'.format(image_name)) ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? print("第{}個文件已存在".format(image_name)) ? ? ? ? except FileNotFoundError as f: ? ? ? ? ? ? print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) ? ? ? ? ? ? print("報錯:", f) ? ? ? ? ? ? raise ? ? ? ? except TypeError as e: ? ? ? ? ? ? print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) ? ? ? ? ? ? print("報錯:", e)
(2)方法二:直接使用 acquire()和 release()
下面的第8行調(diào)用 acquire(),第24行調(diào)用release()
def main(url, image_name): ? ? """ ? ? 主函數(shù):實(shí)現(xiàn)下載圖片功能 ? ? :param url: 圖片url ? ? :param image_name: 圖片名稱 ? ? :return: ? ? """ ? ? pool_sema.acquire() ?# 加鎖,限制線程數(shù) ? ? # with pool_sema: ? ? print('當(dāng)前子線程: {}'.format(threading.current_thread().name)) ? ? save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' ? ? try: ? ? ? ? file_path = '{0}/{1}.jpg'.format(save_path, image_name) ? ? ? ? if not os.path.exists(file_path): ?# 判斷是否存在文件,不存在則爬取 ? ? ? ? ? ? with open(file_path, 'wb') as f: ? ? ? ? ? ? ? ? f.write(get_image_content(url)) ? ? ? ? ? ? ? ? f.close() ? ? ? ? ? ? ? ? print('第{}個文件保存成功'.format(image_name)) ? ? ? ? else: ? ? ? ? ? ? print("第{}個文件已存在".format(image_name)) ? ? ? ? pool_sema.release() ?# 解鎖imgbin-多線程-重寫run方法.py ? ? except FileNotFoundError as f: ? ? ? ? print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) ? ? ? ? print("報錯:", f) ? ? ? ? raise ? ? except TypeError as e: ? ? ? ? print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) ? ? ? ? print("報錯:", e)
最終效果是一樣的,每次啟用5個線程,完成后再啟動下一批
到此這篇關(guān)于Python多線程中線程數(shù)量如何控制的文章就介紹到這了,更多相關(guān)Python 線程數(shù)量控制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python selenium模擬手動操作實(shí)現(xiàn)無人值守刷積分功能
這篇文章主要介紹了Python selenium模擬手動操作達(dá)到無人值守刷積分目的,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05PyTorch模型轉(zhuǎn)TensorRT是怎么實(shí)現(xiàn)的?
今天給大家?guī)淼氖顷P(guān)于Python的相關(guān)知識,文章圍繞著PyTorch模型轉(zhuǎn)TensorRT是怎么實(shí)現(xiàn)的展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06Jupyter notebook 輸出部分顯示不全的解決方案
這篇文章主要介紹了Jupyter notebook 輸出部分顯示不全的解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04Linux下Python安裝完成后使用pip命令的詳細(xì)教程
這篇文章主要介紹了Linux下Python安裝完成后使用pip命令的詳細(xì)教程,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2018-11-11Django中數(shù)據(jù)庫的數(shù)據(jù)關(guān)系:一對一,一對多,多對多
今天小編就為大家分享一篇關(guān)于Django中數(shù)據(jù)庫的數(shù)據(jù)關(guān)系:一對一,一對多,多對多,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10