基于Python實現(xiàn)的通用小規(guī)模搜索引擎
1.項目簡介
1.1背景
《信息內容安全》網絡信息內容獲取技術課程項目設計
- 一個至少能支持10個以上網站的爬蟲程序,且支持增量式數(shù)據(jù)采集;并至少采集10000個實際網頁;
- 針對采集回來的網頁內容, 能夠實現(xiàn)網頁文本的分類;
- 可進行重復或冗余網頁的去重過濾;
- 對經去冗以后的內容建立倒排索引;
- 采用PageRank算法實現(xiàn)搜索結果的排序;
- 支持自然語言的模糊檢索;
- 可實現(xiàn)搜索結果的可視化呈現(xiàn)。
- 可以在線記錄每次檢索的日志,井可對日志數(shù)據(jù)進統(tǒng)計分析和關聯(lián)挖掘。
1.2運行環(huán)境
- 平臺:全平臺
- jdk 1.8.0
- ElasticSearch 7.4.0
- Python 3.6 及以上
安裝依賴模塊
- PageRank算法、AI文本分類與上傳
> pip install paddlepaddle numpy elasticsearch
- 數(shù)據(jù)的爬取與預處理
> pip install requests bs4
1.3運行步驟
安裝配置ElasticSearch并啟動
- 下載 并解壓Elasticsearch,詳細步驟自行搜索
- 可以從 apt 和 yum 的軟件倉庫安裝,也可以使用 Windows MSI 安裝包安裝
- 安裝 IK 中文分詞器,詳細步驟自行搜索
- 創(chuàng)建索引
PUT http://127.0.0.1/page { "settings": { "number_of_shards": "5", "number_of_replicas": "0" }, "mappings": { "properties": { "title": { "type": "text", "analyzer": "ik_max_word" }, "weight": { "type": "double" }, "content" : { "type" : "text", "analyzer": "ik_max_word" }, "content_type": { "type": "text" }, "url": { "type": "text", "analyzer": "ik_max_word" }, "update_date": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
- 啟動 ElasticSearch ,在 bash 中執(zhí)行
bin/elasticsearch
或者在 Windows 的 cmd、powershell 執(zhí)行bin\elasticsearch.bat
啟動Web服務
> cd WebApp > java -jar *.jar
數(shù)據(jù)的爬取與預處理
> cd DataCrawler > python crawler.py
計算PageRank值
> cd DataProcess > python PageRank.py
利用AI進行文本分類并上傳至ES
> cd DataProcess/Text_Classification > python Classify.py
2.需求分析
2.1數(shù)據(jù)描述
2.1.1 靜態(tài)數(shù)據(jù)
變量名 | 描述 |
| 線程個數(shù) |
| 種子頁面 |
2.1.2 動態(tài)數(shù)據(jù)
變量名 | 描述 | 類型 |
| 限定域名 | 列表 |
| 禁止域名 | 列表 |
| 線程個數(shù) | 整型 |
| 限定頁面?zhèn)€數(shù) | 整型 |
2.1.3索引數(shù)據(jù)字典
頁面(page)信息索引:
數(shù)據(jù)項名稱 | 含義 | 別名 | 類型 | 備注 |
| 網站標題 |
| 使用 | |
| PageRank值 | pr值,PR值 |
| |
| 網站中的內容 |
| 使用 分詞 | |
| 網站中的內容分類 |
| 文化, 娛樂, 體育, 財經, 房產, 汽車, 教育, 科技, 國際, 證券 | |
| 網站的鏈接 |
| 使用 分詞 | |
| 數(shù)據(jù)更新的時間 |
|
|| || |
2.2. 數(shù)據(jù)采集
種子 url 數(shù)據(jù)從 init_url 列表中選取,并按照順序,依次以各個 url 為起點進行遞歸的數(shù)據(jù)采集
爬取數(shù)據(jù)的url需要限制在 restricted_url 列表里面
2.3功能需求
2.3.1 數(shù)據(jù)爬取與預處理功能
利用Python爬蟲,執(zhí)行以下步驟:
- 開始
- 選取一個鏈接作為起點
- 如果爬取的網頁總數(shù)達到要求,則結束,否則執(zhí)行第 4 步
- 爬取指定鏈接的相關信息,并獲取當前網站中的所有鏈接
- 對 4 中獲取的網站中的所有鏈接中的每一條數(shù)據(jù),執(zhí)行過程3
爬取網站如下信息,
- title
- content
- content_type
- update_date
- url
- link(當前網站中包含的所有鏈接,用于計算pr值)
2.3.2. 計算 PageRank 功能
根據(jù)link
計算爬取下來的每個網站的PageRank值,迭代次數(shù)為50次。解決pr值呈周期性變化的問題。將pr值作為網站重要程度的指標,并補充到網站信息中
2.3.3. AI 文本分類并提交到 ES 功能
利用深度學習,分析每個頁面的content的類別。將類別補充到網站信息中,同時刪除網站信息中不再使用的link
項,形成最終數(shù)據(jù),并上傳至ES,供用戶交互功能調用。
2.3.4. 用戶交互功能
設計WebApp,用戶通過瀏覽器訪問頁面。用戶提交搜索信息后,判斷合法性,不合法則返回ERROR界面提示用戶。如果合法,則后端代碼從本地 ES 中查詢數(shù)據(jù),處理后將結果分條顯示到前端。同時通過限制單個ip每分鐘的訪問次數(shù)來簡單防御用戶惡意搜索。
2.4. 性能需求
2.4.1. 數(shù)據(jù)精確度
對數(shù)據(jù)精確度要求不高,主要數(shù)據(jù)為:
項目 | 限制 |
爬取的數(shù)據(jù)總量 | 每小時查詢一下數(shù)據(jù)總量 |
查詢結果數(shù)量 | 匹配的所有結果數(shù) |
數(shù)據(jù)更新日期 | 精確到分鐘即可 |
2.4.2. 時間特性
項目 | 限制 |
每爬取 1 萬個網頁耗時 | 30 分鐘以內 |
計算 1 萬個網頁的pr值耗時 | 10 分鐘以內 |
對 1 萬個網頁內容進行AI 進行文本分類并上傳至ES耗時 | 10 分鐘以內 |
Web 首頁打開耗時 | 5 秒以內 |
查詢結果頁面打開耗時 | 5 秒以內 |
2.5. 運行需求
2.5.1. 用戶界面
用戶通過瀏覽器訪問,有兩個頁面,一個是主頁,只有簡單的輸入框提供用戶搜索;另一個是一般界面,提供高級搜索功能,并顯示搜索結果。
2.5.2. 主頁
控件 | 作用 | 布局 |
圖標 | 顯示Logo | 居中 |
|輸入框|接收用戶輸入的關鍵字|Logo圖標下偏左
|按鈕|提交用戶輸入的關鍵字,并返回搜索結果|輸入框右|
2.5.3. 搜索結果界面
該界面分為三個部分,導航條、搜索結果、信息展示。這三個部分布局如下
部分 | 位置 | height | width |
導航條 | 頂部 | 50px | 100% |
搜索結果 | 導航條左下部 | auto | 70% |
信息展示 | 導航條右下部 | auto | 30% |
導航條部分
以下控件從左向右依次(順序可以任意)在導航條中排列
控件 | 作用 |
輸入框 | 接收用戶輸入的關鍵字 |
輸入框 | 可以輸入域名,將搜索結果限制在該域名內 |
數(shù)字輸入框 | 查詢結果分頁顯示,該框指示跳轉到指定的搜索結果頁 |
選擇框 | 允許用戶選擇匹配方式:標題和內容(默認)、僅標題、僅內容 |
選擇框 | 選擇搜索結果的排序方式:倒排索引(默認)、 PageRank 排序 |
按鈕 | 提交用戶輸入的所有數(shù)據(jù),并返回搜索結果 |
搜索結果部分
將搜索結果以list的形式展示出來,每個list item顯示匹配的網站的如下數(shù)據(jù)
- 標題
- 內容
- url
- 類別
- PageRank值
- 更新時間
在list結尾,顯示分頁組件,使用戶可以點擊跳轉,樣式如下:
< | 1 | 2 | 3 | 4 | 5 | 6 | > |
信息展示部分
展示一些必要信息,如:
- 本次查詢耗時
- 查詢結果數(shù)
- 數(shù)據(jù)庫中的數(shù)據(jù)總數(shù)
- 等等
2.5.4 軟件接口
接口名 | 描述 | 所在模塊 | 調用方式 |
| 初次啟動調用此接口 |
| 內部調用 |
| 得到目標 url 的頁面 |
| 內部調用 |
| 爬蟲線程 |
| 內部調用 |
| 主任務執(zhí)行線程 |
|
|
| 去掉所有未在 url 中出現(xiàn)的 link 及錯誤文件 |
| 內部調用 |
| 計算PageRank |
| 內部調用 |
| 程序運行方法 |
|
|
| 獲取已爬取數(shù)據(jù) |
| 內部調用 |
| 利用AI進行文本分類 |
|
|
2.5.5. 故障處理
各個功能模塊如果出問題,會出現(xiàn)以下情況:
模塊 | 出故障后 | 簡單排查 |
爬蟲 | 數(shù)據(jù)不再更新 | 檢查網絡,檢查內存資源是否不足 |
PageRank計算 | 數(shù)據(jù)不再更新 | 檢查內存資源和CPU資源是否不足 |
AI 文本分類 | 數(shù)據(jù)不再更新 | 檢查內存資源和CPU資源是否不足 |
ElasticSearch | 前端無法獲取查詢結果 | 問題比較復雜 |
WebApp | 無法訪問網站 | 問題比較復雜 |
其中,后兩個模塊出問題會造成嚴重問題,如果重啟不能解決問題的話,采用如下措施
模塊 | 故障排除 | 終極方法 |
ElasticSearch | ①java環(huán)境是否正確 | 在其他機器上部署,并修改WebApp使其到該機器上獲取服務 |
WebApp | ①端口是否被占用 | 在其他機器上部署,并修改域名解析,將域名解析到新機器上 |
2.6. 其他需求
2.6.1. 可維護性
- 網絡爬蟲設置了黑名單和白名單,可以限制爬取的范圍。
- 各個功能分離開,協(xié)同工作。同時,只要不修改數(shù)據(jù)格式,各個模塊的修改不會影響其他模塊
2.6.2. 可移植性
- WebApp 使用 Spring boot 框架開發(fā),打包后只有一個jar包,可以在任何有java環(huán)境的機器上部署
- 其他功能都用python實現(xiàn),可以部署在任何有python環(huán)境的機器上
- ElasticSearch 支持分布式部署,可以部署在任意平臺
2.6.3. 數(shù)據(jù)完整性
- ElasticSearch 支持分布式,會自動將數(shù)據(jù)備份在不同節(jié)點。如果某個節(jié)點出了故障,不會破壞數(shù)據(jù),也不會影響程序的查詢結果
3.代碼展示
import os import sys import json import numpy as np import time import codecs dir_path = os.path.split(os.path.realpath(sys.argv[0]))[0] + '/../RawData' print(dir_path) Vexname = list(os.listdir(dir_path)) Vexnum = len(Vexname) epoch = 50 # 初始化,去掉所有未在url中出現(xiàn)的link以及錯誤文件 def init(): global Vexnum falsefiles={} idx=0 start = time.perf_counter() for file in Vexname: if idx % 100 == 0: a = '=' * int(idx / Vexnum * 100) b = ' ' * (100 - int(idx / Vexnum * 100)) c = int(idx / Vexnum * 100) dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() with codecs.open(os.path.join(dir_path, file), 'r', encoding='utf-8') as load_f: try: text = json.load(load_f) except: falsefiles[file]=Vexname.index(file)-len(falsefiles) continue try: links = [] for link in text['link']: if link+'.json' in Vexname: links.append(link) text['link'] = links.copy() except: pass finally: if 'link' in text: text['link'].clear() else: text['link'] = [] with codecs.open(os.path.join(dir_path, file), 'w', encoding='utf-8') as dump_f: json.dump(text, dump_f, ensure_ascii=False,indent=4) idx += 1 print('正在刪除錯誤文件及鏈接...') Vexnum -= len(falsefiles) checknum=0 checkfalse=0 for file in list(falsefiles.keys()): os.remove(os.path.join(dir_path,file)) Vexname.remove(file) for i in range(checknum,falsefiles[file]): with codecs.open(os.path.join(dir_path, Vexname[i]), 'r', encoding='utf-8') as load_f: text = json.load(load_f) try: for falsefile in list(falsefiles.keys())[checkfalse:]: if falsefile in text['link']: text['link'].remove(falsefile) except: text['link'].clear() with codecs.open(os.path.join(dir_path, Vexname[i]), 'w', encoding='utf-8') as dump_f: json.dump(text, dump_f, ensure_ascii=False,indent=4) checknum += falsefiles[file] checkfalse += 1 # 計算PageRank def Rank(Value, start): NewValue=np.zeros(Vexnum,dtype=np.double) for iter in range(1,epoch): a = '=' * int(iter / epoch * 100) b = ' ' * (100 - int(iter / epoch * 100)) c = int(iter / epoch * 100) dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() for i in range(Vexnum): with open(os.path.join(dir_path, Vexname[i]), 'r', encoding='utf-8') as load_f: text = json.load(load_f) count = len(text['link']) if count == 0: NewValue[i] = Value[i] continue for link in text['link']: link += '.json' NewValue[Vexname.index(link)] += Value[i] / count for i in range(Vexnum): NewValue[i] = NewValue[i] / (iter + 1) + Value[i] * (iter / (iter + 1)) Value=NewValue.copy() return Value def run(): print('開始計算PageRank...') print('數(shù)據(jù)初始化...') init() Value = np.ones(len(Vexname),dtype=np.double)*(1000.0/Vexnum) print('錯誤文件刪除完畢!') print('正在計算PageRank(迭代次數(shù){})...'.format(epoch)) start = time.perf_counter() Value = Rank(Value, start) a = '=' * 100 b = ' ' * 0 c = 100 dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() print('\nPageRank計算完畢,正在往JSON中寫入數(shù)據(jù)...') max = {} for file in Vexname: # 將PageRank寫入JSON with open(os.path.join(dir_path, file), 'r', encoding='utf-8') as load_f: text = json.load(load_f) with open(os.path.join(dir_path, file), 'w', encoding='utf-8') as dump_f: text['weight'] = Value[Vexname.index(file)] max[file] = text['weight'] json.dump(text, dump_f, ensure_ascii=False,indent=4) print('數(shù)據(jù)寫入完畢...') if __name__ == '__main__': run()
# 導入必要的包 import json import os import sys import time import math import gc import elasticsearch import numpy as np import paddle.fluid as fluid dir_path = os.path.dirname(os.path.realpath(__file__)) # 用訓練好的模型進行預測并輸出預測結果 # 創(chuàng)建執(zhí)行器 place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) save_path = os.path.join(dir_path, 'infer_model/') # 從模型中獲取預測程序、輸入數(shù)據(jù)名稱列表、分類器 [infer_program, feeded_var_names, target_var] = fluid.io.load_inference_model(dirname=save_path, executor=exe) # 主機 host = "py7hon.com:9200" # 建立 elasticsearch 連接 try: es = elasticsearch.Elasticsearch(hosts=host) except Exception as e: print(e) exit() # 獲取數(shù)據(jù) def get_data(sentence): # 讀取數(shù)據(jù)字典 with open(os.path.join(dir_path, 'dict_txt.txt'), 'r', encoding='utf-8') as f_data: dict_txt = eval(f_data.readlines()[0]) dict_txt = dict(dict_txt) # 把字符串數(shù)據(jù)轉換成列表數(shù)據(jù) keys = dict_txt.keys() data = [] for s in sentence: # 判斷是否存在未知字符 if not s in keys: s = '<unk>' data.append((np.int64)(dict_txt[s])) return data def batch_reader(Json_list,json_path): datas = [] gc.collect() json_files = [] falsefiles = [] datas.clear() falsefiles.clear() json_files.clear() start = time.perf_counter() i=0 scale = 100 for file in Json_list: if i % 100 == 0: a = '=' * int(i / len(Json_list) * 100) b = ' ' * (scale - int(i / len(Json_list) * 100)) c = int(i / len(Json_list) * 100) dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() i+=1 with open(os.path.join(json_path, file), "r", encoding='utf-8') as f: try: text = json.load(f) except: falsefiles.append(file) continue json_files.append(os.path.join(json_path, file)) json_text = text['content'] data = get_data(json_text) datas.append(data) for file in falsefiles: os.remove(os.path.join(dir_path, file)) file_count = len(Json_list) - len(falsefiles) a = '=' * 100 b = ' ' * 0 c = 100 dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() print('\n文本數(shù)據(jù)獲取完畢,共計{0}條文本數(shù)據(jù),有效數(shù)據(jù){2}條,無效數(shù)據(jù){1}條(已刪除)!'.format(len(Json_list),len(falsefiles),file_count)) print('AI正在加載分類模型...') # 獲取每句話的單詞數(shù)量 base_shape = [[len(c) for c in datas]] # 生成預測數(shù)據(jù) tensor_words = fluid.create_lod_tensor(datas, base_shape, place) # 執(zhí)行預測 result = exe.run(program=infer_program, feed={feeded_var_names[0]: tensor_words}, fetch_list=target_var) print('模型加載完畢!') # 分類名稱 names = ['文化', '娛樂', '體育', '財經', '房產', '汽車', '教育', '科技', '國際', '證券'] count = np.zeros(10) print('AI正在對文本數(shù)據(jù)進行分類并上傳至ES:') # 獲取結果概率最大的label start = time.perf_counter() for i in range(file_count): if i % 100 == 0: a = '=' * int(i / file_count * 100) b = ' ' * (scale - int(i / file_count * 100)) c = int(i / file_count * 100) dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() lab = np.argsort(result)[0][i][-1] # print('預測結果標簽為:%d, 名稱為:%s, 概率為:%f' % (lab, names[lab], result[0][i][lab])) count[lab] += 1 with open(json_files[i], 'r', encoding='utf-8') as load_f: try: text = json.load(load_f) except: continue text['content_type'] = names[lab] id = json_files[i].split('\\')[-1].split('.')[0] #try: del text['link'] response = es.index(index='page', doc_type='_doc', id=id, body=text) #except Exception: # print("\n" + "數(shù)據(jù) " + id + " 插入失敗,錯誤信息:" + response) # with open(os.path.join(json_path,json_files[i].split('\\')[-1]),'w') as dump_f: # json.dump(text,dump_f) a = '=' * 100 b = ' ' * 0 c = 100 dur = time.perf_counter() - start sys.stdout.write("\r{:^3.0f}%[{}=>{}]{:.2f}s".format(c, a, b, dur)) sys.stdout.flush() print("\n" + "%d條文本數(shù)據(jù)分類結束!已全部上傳至ES" % (file_count)) def run(): # 獲取圖片數(shù)據(jù) print('AI正在獲取文本數(shù)據(jù)...') json_path = os.path.realpath(__file__) + '/../../../RawData' Json_list = os.listdir(json_path) batch_size=500 if len(Json_list)>batch_size: Json_batch=0 print('當前文本數(shù)量為{0}條,正在分批處理...'.format(len(Json_list))) for batch_id in range(math.ceil(len(Json_list)/batch_size)): a=(batch_size if batch_size<(len(Json_list)-Json_batch) else len(Json_list)-Json_batch) print('正在處理第{0}批,數(shù)量為{1}...'.format(batch_id+1,a)) batch_reader(Json_list[Json_batch:Json_batch+a],json_path) Json_batch += a else: batch_reader(Json_list,json_path) if __name__ == '__main__': run()
4.系統(tǒng)展示
以上就是基于Python實現(xiàn)的通用小規(guī)模搜索引擎的詳細內容,更多關于Python小規(guī)模搜索引擎的資料請關注腳本之家其它相關文章!
相關文章
Python自動檢測requests所獲得html文檔的編碼
這篇文章主要為大家詳細介紹了如何通過Python自動檢測requests實現(xiàn)獲得html文檔的編碼,文中的示例代碼講解詳細,感興趣的可以了解下2024-11-11python3如何使用saml2.0協(xié)議接入SSO
SAML是一種用于在不同系統(tǒng)之間傳輸安全聲明的XML框架,通過IDP和SP之間的重定向訪問,SP向IDP請求用戶身份認證,IDP驗證用戶身份后返回SAML應答,本文以python3和python3-saml庫為例,介紹了如何接入公司SSO系統(tǒng),包括配置和處理登錄和登出請求2024-11-11像線程一樣管理進程的Python multiprocessing庫
multiprocessing庫是基于threading API,它可以把工作劃分為多個進程.有些情況下,multiprocessing可以作為臨時替換取代threading來利用多個CPU內核,相應地避免Python全局解釋器鎖所帶來的計算瓶頸.本文詳細介紹了Python multiprocessing庫,需要的朋友可以參考下2021-05-05Pandas||過濾缺失數(shù)據(jù)||pd.dropna()函數(shù)的用法說明
這篇文章主要介紹了Pandas||過濾缺失數(shù)據(jù)||pd.dropna()函數(shù)的用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05