基于asyncio 異步協(xié)程框架實(shí)現(xiàn)收集B站直播彈幕
前言
雖然標(biāo)題是全站,但目前只做了等級 top 100 直播間的全天彈幕收集。
彈幕收集系統(tǒng)基于之前的B 站直播彈幕姬 Python 版修改而來。具體協(xié)議分析可以看上一篇文章。
直播彈幕協(xié)議是直接基于 TCP 協(xié)議,所以如果 B 站對類似我這種行為做反制措施,比較困難。應(yīng)該有我不知道的技術(shù)手段來檢測類似我這種惡意行為。
我試過同時(shí)連接 100 個(gè)房間,和連接單個(gè)房間 100 次的實(shí)驗(yàn),都沒有問題。>150 會被關(guān)閉鏈接。
直播間的選取
現(xiàn)在彈幕收集系統(tǒng)在選取直播間上比較簡單,直接選取了等級 top100。
以后會修改這部分,改成定時(shí)去 http://live.bilibili.com/all 查看新開播的直播間,并動(dòng)態(tài)添加任務(wù)。
異步任務(wù)和彈幕存儲
收集系統(tǒng)仍舊使用了 asyncio 異步協(xié)程框架,對于每一個(gè)直播間都使用如下方法來加進(jìn) loop 中。
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
其實(shí)若將心跳任務(wù) HeartbeatLoop 放入 connectorServer 中去啟動(dòng),代碼看起來更優(yōu)雅一些。但這么做是因?yàn)槲倚枰S護(hù)一個(gè)任務(wù)列表,后面會有描述。
在彈幕存儲上我花了些時(shí)間選擇。
數(shù)據(jù)庫存儲是一個(gè)同步 IO 的過程,Insert 的時(shí)候會阻塞彈幕收集的任務(wù)。雖然有 aiomysql 這種異步接口,但配置數(shù)據(jù)庫太麻煩,我的設(shè)想是這個(gè)小系統(tǒng)能夠方便地部署。
最終我選擇使用自帶的 sqlite3。但 sqlite3 無法做并行操作,故開了一個(gè)線程單獨(dú)進(jìn)行數(shù)據(jù)庫存儲。在另一個(gè)線程中,100 * 2 個(gè)任務(wù)搜集所有的彈幕、人數(shù)信息,并塞進(jìn)隊(duì)列 commentq, numq 中。存儲線程每隔 10s 喚醒一次,將隊(duì)列中的數(shù)據(jù)寫進(jìn) sqlite3 中,并清空隊(duì)列。
在多線程和異步的配合下,網(wǎng)絡(luò)流量沒有被阻塞。
可能的連接失敗場景處理
彈幕協(xié)議是直接基于 TCP,位與位直接關(guān)聯(lián)性較強(qiáng),一旦解析錯(cuò)誤,很容易就拋 Exception(個(gè)人感覺,雖然 TCP 是可靠傳輸,但B站服務(wù)器自身發(fā)生錯(cuò)誤也是有可能的)。所以有必要設(shè)計(jì)一個(gè)自動(dòng)重連機(jī)制。
在 asyncio 文檔中提到,
Done means either that a result / exception are available, or that the future was cancelled.
函數(shù)正常返回、拋出異常或者是被 cancel,都會退出當(dāng)前任務(wù)??梢允褂?done() 來判斷。
每一個(gè)直播間對應(yīng)兩個(gè)任務(wù),解析任務(wù)是最容易掛的,但并不會影響心跳任務(wù),所以必須找出并將對應(yīng)心跳任務(wù)結(jié)束。
在創(chuàng)建任務(wù)的時(shí)候使用字典記錄每個(gè)房間的兩個(gè)任務(wù),
self.tasks[url] = [task1, task2]
在運(yùn)行過程中,每隔 10s 做一次檢查,
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]
實(shí)際我只見過一次任務(wù)失敗的場景,是因?yàn)橹鞑シ块g被封了,導(dǎo)致無法進(jìn)入直播間。
結(jié)論
- B站人數(shù)是按照連接彈幕服務(wù)器的鏈接數(shù)量統(tǒng)計(jì)的。通過操縱鏈接量,可以瞬間增加任意人數(shù)觀看,有商機(jī)?
- 運(yùn)行的這幾天中,發(fā)現(xiàn)即使大部分房間不在直播,也能有 >5 的人數(shù),包括凌晨。我只能猜測也有和我一樣的人在 24h 收集彈幕。
- top100 平均一天 40M 彈幕數(shù)據(jù)。
- 收集的彈幕能做什么?還沒想好,可能可以拿來做用戶行為分析 -_^
最后附上本源碼的GITHUB地址 https://github.com/lyyyuna/bilibili_danmu_colloector
相關(guān)文章
python 實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)的示例
本篇文章主要分享python學(xué)生管理系統(tǒng)的使用,文章非常詳細(xì)地介紹了通過示例代碼實(shí)現(xiàn)的學(xué)生管理系統(tǒng),該系統(tǒng)對每個(gè)人的研究或工作都有一定的參考學(xué)習(xí)價(jià)值,希望你能在其中有所收獲。2020-11-11django restframework序列化字段校驗(yàn)規(guī)則
本文主要介紹了django restframework序列化字段校驗(yàn)規(guī)則,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05python消費(fèi)kafka數(shù)據(jù)批量插入到es的方法
今天小編就為大家分享一篇python消費(fèi)kafka數(shù)據(jù)批量插入到es的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12用Python實(shí)現(xiàn)換行符轉(zhuǎn)換的腳本的教程
這篇文章主要介紹了用Python實(shí)現(xiàn)換行符轉(zhuǎn)換的腳本的教程,代碼非常簡單,包括一個(gè)對操作說明的功能的實(shí)現(xiàn),需要的朋友可以參考下2015-04-04Python爬蟲爬取微博熱搜保存為 Markdown 文件的源碼
這篇文章主要介紹了Python爬蟲爬取微博熱搜保存為 Markdown 文件,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02Python如何運(yùn)用pyaudio庫去做一個(gè)固定采樣率音頻錄制器
這篇文章主要介紹了Python如何運(yùn)用pyaudio庫去做一個(gè)固定采樣率音頻錄制器問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-05-05