Python實(shí)現(xiàn)并行抓取整站40萬(wàn)條房?jī)r(jià)數(shù)據(jù)(可更換抓取城市)
寫在前面
這次的爬蟲是關(guān)于房?jī)r(jià)信息的抓取,目的在于練習(xí)10萬(wàn)以上的數(shù)據(jù)處理及整站式抓取。
數(shù)據(jù)量的提升最直觀的感覺便是對(duì)函數(shù)邏輯要求的提高,針對(duì)Python的特性,謹(jǐn)慎的選擇數(shù)據(jù)結(jié)構(gòu)。以往小數(shù)據(jù)量的抓取,即使函數(shù)邏輯部分重復(fù),I/O請(qǐng)求頻率密集,循環(huán)套嵌過(guò)深,也不過(guò)是1~2s的差別,而隨著數(shù)據(jù)規(guī)模的提高,這1~2s的差別就有可能擴(kuò)展成為1~2h。
因此對(duì)于要抓取數(shù)據(jù)量較多的網(wǎng)站,可以從兩方面著手降低抓取信息的時(shí)間成本。
1)優(yōu)化函數(shù)邏輯,選擇適當(dāng)?shù)臄?shù)據(jù)結(jié)構(gòu),符合Pythonic的編程習(xí)慣。例如,字符串的合并,使用join()要比“+”節(jié)省內(nèi)存空間。
2)依據(jù)I/O密集與CPU密集,選擇多線程、多進(jìn)程并行的執(zhí)行方式,提高執(zhí)行效率。
一、獲取索引
包裝請(qǐng)求request,設(shè)置超時(shí)timeout
# 獲取列表頁(yè)面
def get_page(url):
headers = {
'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
'Referer': r'http://bj.fangjia.com/ershoufang/',
'Host': r'bj.fangjia.com',
'Connection': 'keep-alive'
}
timeout = 60
socket.setdefaulttimeout(timeout) # 設(shè)置超時(shí)
req = request.Request(url, headers=headers)
response = request.urlopen(req).read()
page = response.decode('utf-8')
return page
一級(jí)位置:區(qū)域信息

二級(jí)位置:板塊信息(根據(jù)區(qū)域位置得到板塊信息,以key_value對(duì)的形式存儲(chǔ)在dict中)

以dict方式存儲(chǔ),可以快速的查詢到所要查找的目標(biāo)。-> {'朝陽(yáng)':{'工體','安貞','健翔橋'......}}
三級(jí)位置:地鐵信息(搜索地鐵周邊房源信息)

將所屬位置地鐵信息,添加至dict中。 -> {'朝陽(yáng)':{'工體':{'5號(hào)線','10號(hào)線' , '13號(hào)線'},'安貞','健翔橋'......}}
對(duì)應(yīng)的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97
解碼后的url:http://bj.fangjia.com/ershoufang/--r-朝陽(yáng)|w-5號(hào)線|b-惠新西街
根據(jù)url的參數(shù)模式,可以有兩種方式獲取目的url:
1)根據(jù)索引路徑獲得目的url

# 獲取房源信息列表(嵌套字典遍歷) def get_info_list(search_dict, layer, tmp_list, search_list): layer += 1 # 設(shè)置字典層級(jí) for i in range(len(search_dict)): tmp_key = list(search_dict.keys())[i] # 提取當(dāng)前字典層級(jí)key tmp_list.append(tmp_key) # 將當(dāng)前key值作為索引添加至tmp_list tmp_value = search_dict[tmp_key] if isinstance(tmp_value, str): # 當(dāng)鍵值為url時(shí) tmp_list.append(tmp_value) # 將url添加至tmp_list search_list.append(copy.deepcopy(tmp_list)) # 將tmp_list索引url添加至search_list tmp_list = tmp_list[:layer] # 根據(jù)層級(jí)保留索引 elif tmp_value == '': # 鍵值為空時(shí)跳過(guò) layer -= 2 # 跳出鍵值層級(jí) tmp_list = tmp_list[:layer] # 根據(jù)層級(jí)保留索引 else: get_info_list(tmp_value, layer, tmp_list, search_list) # 當(dāng)鍵值為列表時(shí),迭代遍歷 tmp_list = tmp_list[:layer] return search_list
2)根據(jù)dict信息包裝url
{'朝陽(yáng)':{'工體':{'5號(hào)線'}}}
參數(shù):
—— r-朝陽(yáng)
—— b-工體
—— w-5號(hào)線
組裝參數(shù):http://bj.fangjia.com/ershoufang/--r-朝陽(yáng)|w-5號(hào)線|b-工體
1 # 根據(jù)參數(shù)創(chuàng)建組合url 2 def get_compose_url(compose_tmp_url, tag_args, key_args): 3 compose_tmp_url_list = [compose_tmp_url, '|' if tag_args != 'r-' else '', tag_args, parse.quote(key_args), ] 4 compose_url = ''.join(compose_tmp_url_list) 5 return compose_url
二、獲取索引頁(yè)最大頁(yè)數(shù)
# 獲取當(dāng)前索引頁(yè)面頁(yè)數(shù)的url列表
def get_info_pn_list(search_list):
fin_search_list = []
for i in range(len(search_list)):
print('>>>正在抓取%s' % search_list[i][:3])
search_url = search_list[i][3]
try:
page = get_page(search_url)
except:
print('獲取頁(yè)面超時(shí)')
continue
soup = BS(page, 'lxml')
# 獲取最大頁(yè)數(shù)
pn_num = soup.select('span[class="mr5"]')[0].get_text()
rule = re.compile(r'\d+')
max_pn = int(rule.findall(pn_num)[1])
# 組裝url
for pn in range(1, max_pn+1):
print('************************正在抓取%s頁(yè)************************' % pn)
pn_rule = re.compile('[|]')
fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
tmp_url_list = copy.deepcopy(search_list[i][:3])
tmp_url_list.append(fin_url)
fin_search_list.append(tmp_url_list)
return fin_search_list
三、抓取房源信息Tag
這是我們要抓取的Tag:
['區(qū)域', '板塊', '地鐵', '標(biāo)題', '位置', '平米', '戶型', '樓層', '總價(jià)', '單位平米價(jià)格']

# 獲取tag信息
def get_info(fin_search_list, process_i):
print('進(jìn)程%s開始' % process_i)
fin_info_list = []
for i in range(len(fin_search_list)):
url = fin_search_list[i][3]
try:
page = get_page(url)
except:
print('獲取tag超時(shí)')
continue
soup = BS(page, 'lxml')
title_list = soup.select('a[class="h_name"]')
address_list = soup.select('span[class="address]')
attr_list = soup.select('span[class="attribute"]')
price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select對(duì)于某些屬性值(屬性值中間包含空格)無(wú)法識(shí)別,可以用find_all(attrs={})代替
for num in range(20):
tag_tmp_list = []
try:
title = title_list[num].attrs["title"]
print(r'************************正在獲取%s************************' % title)
address = re.sub('\n', '', address_list[num].get_text())
area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
for tag in [title, address, area, layout, floor, price, unit_price]:
tag_tmp_list.append(tag)
fin_info_list.append(tag_tmp_list)
except:
print('【抓取失敗】')
continue
print('進(jìn)程%s結(jié)束' % process_i)
return fin_info_list
四、分配任務(wù),并行抓取
對(duì)任務(wù)列表進(jìn)行分片,設(shè)置進(jìn)程池,并行抓取。
# 分配任務(wù) def assignment_search_list(fin_search_list, project_num): # project_num每個(gè)進(jìn)程包含的任務(wù)數(shù),數(shù)值越小,進(jìn)程數(shù)越多 assignment_list = [] fin_search_list_len = len(fin_search_list) for i in range(0, fin_search_list_len, project_num): start = i end = i+project_num assignment_list.append(fin_search_list[start: end]) # 獲取列表碎片 return assignment_list
p = Pool(4) # 設(shè)置進(jìn)程池 assignment_list = assignment_search_list(fin_info_pn_list, 3) # 分配任務(wù),用于多進(jìn)程 result = [] # 多進(jìn)程結(jié)果列表 for i in range(len(assignment_list)): result.append(p.apply_async(get_info, args=(assignment_list[i], i))) p.close() p.join() for result_i in range(len(result)): fin_info_result_list = result[result_i].get() fin_save_list.extend(fin_info_result_list) # 將各個(gè)進(jìn)程獲得的列表合并
通過(guò)設(shè)置進(jìn)程池并行抓取,時(shí)間縮短為單進(jìn)程抓取時(shí)間的3/1,總計(jì)時(shí)間3h。
電腦為4核,經(jīng)過(guò)測(cè)試,任務(wù)數(shù)為3時(shí),在當(dāng)前電腦運(yùn)行效率最高。
五、將抓取結(jié)果存儲(chǔ)到excel中,等待可視化數(shù)據(jù)化處理
# 存儲(chǔ)抓取結(jié)果 def save_excel(fin_info_list, file_name): tag_name = ['區(qū)域', '板塊', '地鐵', '標(biāo)題', '位置', '平米', '戶型', '樓層', '總價(jià)', '單位平米價(jià)格'] book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默認(rèn)存儲(chǔ)在桌面上 tmp = book.add_worksheet() row_num = len(fin_info_list) for i in range(1, row_num): if i == 1: tag_pos = 'A%s' % i tmp.write_row(tag_pos, tag_name) else: con_pos = 'A%s' % i content = fin_info_list[i-1] # -1是因?yàn)楸槐砀竦谋眍^所占 tmp.write_row(con_pos, content) book.close()

附上源碼
#! -*-coding:utf-8-*-
# Function: 房?jī)r(jià)調(diào)查
# Author:蘭茲
from urllib import parse, request
from bs4 import BeautifulSoup as BS
from multiprocessing import Pool
import re
import lxml
import datetime
import cProfile
import socket
import copy
import xlsxwriter
starttime = datetime.datetime.now()
base_url = r'http://bj.fangjia.com/ershoufang/'
test_search_dict = {'昌平': {'霍營(yíng)': {'13號(hào)線': 'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}}
search_list = [] # 房源信息url列表
tmp_list = [] # 房源信息url緩存列表
layer = -1
# 獲取列表頁(yè)面
def get_page(url):
headers = {
'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
'Referer': r'http://bj.fangjia.com/ershoufang/',
'Host': r'bj.fangjia.com',
'Connection': 'keep-alive'
}
timeout = 60
socket.setdefaulttimeout(timeout) # 設(shè)置超時(shí)
req = request.Request(url, headers=headers)
response = request.urlopen(req).read()
page = response.decode('utf-8')
return page
# 獲取查詢關(guān)鍵詞dict
def get_search(page, key):
soup = BS(page, 'lxml')
search_list = soup.find_all(href=re.compile(key), target='')
search_dict = {}
for i in range(len(search_list)):
soup = BS(str(search_list[i]), 'lxml')
key = soup.select('a')[0].get_text()
value = soup.a.attrs['href']
search_dict[key] = value
return search_dict
# 獲取房源信息列表(嵌套字典遍歷)
def get_info_list(search_dict, layer, tmp_list, search_list):
layer += 1 # 設(shè)置字典層級(jí)
for i in range(len(search_dict)):
tmp_key = list(search_dict.keys())[i] # 提取當(dāng)前字典層級(jí)key
tmp_list.append(tmp_key) # 將當(dāng)前key值作為索引添加至tmp_list
tmp_value = search_dict[tmp_key]
if isinstance(tmp_value, str): # 當(dāng)鍵值為url時(shí)
tmp_list.append(tmp_value) # 將url添加至tmp_list
search_list.append(copy.deepcopy(tmp_list)) # 將tmp_list索引url添加至search_list
tmp_list = tmp_list[:layer] # 根據(jù)層級(jí)保留索引
elif tmp_value == '': # 鍵值為空時(shí)跳過(guò)
layer -= 2 # 跳出鍵值層級(jí)
tmp_list = tmp_list[:layer] # 根據(jù)層級(jí)保留索引
else:
get_info_list(tmp_value, layer, tmp_list, search_list) # 當(dāng)鍵值為列表時(shí),迭代遍歷
tmp_list = tmp_list[:layer]
return search_list
# 獲取房源信息詳情
def get_info_pn_list(search_list):
fin_search_list = []
for i in range(len(search_list)):
print('>>>正在抓取%s' % search_list[i][:3])
search_url = search_list[i][3]
try:
page = get_page(search_url)
except:
print('獲取頁(yè)面超時(shí)')
continue
soup = BS(page, 'lxml')
# 獲取最大頁(yè)數(shù)
pn_num = soup.select('span[class="mr5"]')[0].get_text()
rule = re.compile(r'\d+')
max_pn = int(rule.findall(pn_num)[1])
# 組裝url
for pn in range(1, max_pn+1):
print('************************正在抓取%s頁(yè)************************' % pn)
pn_rule = re.compile('[|]')
fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
tmp_url_list = copy.deepcopy(search_list[i][:3])
tmp_url_list.append(fin_url)
fin_search_list.append(tmp_url_list)
return fin_search_list
# 獲取tag信息
def get_info(fin_search_list, process_i):
print('進(jìn)程%s開始' % process_i)
fin_info_list = []
for i in range(len(fin_search_list)):
url = fin_search_list[i][3]
try:
page = get_page(url)
except:
print('獲取tag超時(shí)')
continue
soup = BS(page, 'lxml')
title_list = soup.select('a[class="h_name"]')
address_list = soup.select('span[class="address]')
attr_list = soup.select('span[class="attribute"]')
price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select對(duì)于某些屬性值(屬性值中間包含空格)無(wú)法識(shí)別,可以用find_all(attrs={})代替
for num in range(20):
tag_tmp_list = []
try:
title = title_list[num].attrs["title"]
print(r'************************正在獲取%s************************' % title)
address = re.sub('\n', '', address_list[num].get_text())
area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
for tag in [title, address, area, layout, floor, price, unit_price]:
tag_tmp_list.append(tag)
fin_info_list.append(tag_tmp_list)
except:
print('【抓取失敗】')
continue
print('進(jìn)程%s結(jié)束' % process_i)
return fin_info_list
# 分配任務(wù)
def assignment_search_list(fin_search_list, project_num): # project_num每個(gè)進(jìn)程包含的任務(wù)數(shù),數(shù)值越小,進(jìn)程數(shù)越多
assignment_list = []
fin_search_list_len = len(fin_search_list)
for i in range(0, fin_search_list_len, project_num):
start = i
end = i+project_num
assignment_list.append(fin_search_list[start: end]) # 獲取列表碎片
return assignment_list
# 存儲(chǔ)抓取結(jié)果
def save_excel(fin_info_list, file_name):
tag_name = ['區(qū)域', '板塊', '地鐵', '標(biāo)題', '位置', '平米', '戶型', '樓層', '總價(jià)', '單位平米價(jià)格']
book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默認(rèn)存儲(chǔ)在桌面上
tmp = book.add_worksheet()
row_num = len(fin_info_list)
for i in range(1, row_num):
if i == 1:
tag_pos = 'A%s' % i
tmp.write_row(tag_pos, tag_name)
else:
con_pos = 'A%s' % i
content = fin_info_list[i-1] # -1是因?yàn)楸槐砀竦谋眍^所占
tmp.write_row(con_pos, content)
book.close()
if __name__ == '__main__':
file_name = input(r'抓取完成,輸入文件名保存:')
fin_save_list = [] # 抓取信息存儲(chǔ)列表
# 一級(jí)篩選
page = get_page(base_url)
search_dict = get_search(page, 'r-')
# 二級(jí)篩選
for k in search_dict:
print(r'************************一級(jí)抓?。赫谧ト ?s】************************' % k)
url = search_dict[k]
second_page = get_page(url)
second_search_dict = get_search(second_page, 'b-')
search_dict[k] = second_search_dict
# 三級(jí)篩選
for k in search_dict:
second_dict = search_dict[k]
for s_k in second_dict:
print(r'************************二級(jí)抓取:正在抓取【%s】************************' % s_k)
url = second_dict[s_k]
third_page = get_page(url)
third_search_dict = get_search(third_page, 'w-')
print('%s>%s' % (k, s_k))
second_dict[s_k] = third_search_dict
fin_info_list = get_info_list(search_dict, layer, tmp_list, search_list)
fin_info_pn_list = get_info_pn_list(fin_info_list)
p = Pool(4) # 設(shè)置進(jìn)程池
assignment_list = assignment_search_list(fin_info_pn_list, 2) # 分配任務(wù),用于多進(jìn)程
result = [] # 多進(jìn)程結(jié)果列表
for i in range(len(assignment_list)):
result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
p.close()
p.join()
for result_i in range(len(result)):
fin_info_result_list = result[result_i].get()
fin_save_list.extend(fin_info_result_list) # 將各個(gè)進(jìn)程獲得的列表合并
save_excel(fin_save_list, file_name)
endtime = datetime.datetime.now()
time = (endtime - starttime).seconds
print('總共用時(shí):%s s' % time)
總結(jié):
當(dāng)抓取數(shù)據(jù)規(guī)模越大,對(duì)程序邏輯要求就愈嚴(yán)謹(jǐn),對(duì)python語(yǔ)法要求就越熟練。如何寫出更加pythonic的語(yǔ)法,也需要不斷學(xué)習(xí)掌握的。
以上就是本文的全部?jī)?nèi)容,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,同時(shí)也希望多多支持腳本之家!
相關(guān)文章
Python-VTK隱式函數(shù)屬性選擇和剪切數(shù)據(jù)
這篇文章主要介紹了Python-VTK隱式函數(shù)屬性選擇和剪切數(shù)據(jù),VTK,是一個(gè)開放資源的免費(fèi)軟件系統(tǒng),主要用于三維計(jì)算機(jī)圖形學(xué)、圖像處理和可視化,下面文章主題相關(guān)詳細(xì)內(nèi)容需要的小伙伴可以參考一下2022-04-04
利用python數(shù)據(jù)分析處理進(jìn)行炒股實(shí)戰(zhàn)行情
這篇文章主要介紹了利用python數(shù)據(jù)分析進(jìn)行炒股實(shí)戰(zhàn)行情,本文主要介紹三部分:數(shù)據(jù)采集,數(shù)據(jù)預(yù)處理,利用SVM算法進(jìn)行建模,本文僅供參考借鑒2021-08-08

