Python使用DPKT實(shí)現(xiàn)分析數(shù)據(jù)包
dpkt項(xiàng)目是一個(gè)Python
模塊,主要用于對(duì)網(wǎng)絡(luò)數(shù)據(jù)包進(jìn)行解析和操作。它可以處理多種協(xié)議,例如TCP
、UDP
、IP
等,并提供了一些常用的網(wǎng)絡(luò)操作功能,例如計(jì)算校驗(yàn)和、解析DNS
數(shù)據(jù)包等。由于其簡(jiǎn)單易用的特性,dpkt
被廣泛應(yīng)用于網(wǎng)絡(luò)安全領(lǐng)域,例如流量分析、漏洞利用、入侵檢測(cè)等。使用該庫(kù)可以快速解析通過(guò)各類抓包工具抓到的數(shù)據(jù)包,從而提取分析包內(nèi)的參數(shù)。
安裝DPKT工具:pip install dpkt
在分析數(shù)據(jù)包之前我們需要抓取特定數(shù)據(jù)包并保存為*.pcap
格式,通常情況下這種數(shù)據(jù)包格式可通過(guò)WireShark
等工具抓取到,當(dāng)然也可以使用上一篇提到的Scapy
庫(kù)實(shí)現(xiàn),該庫(kù)中存在一個(gè)sniff
函數(shù),該函數(shù)可以實(shí)現(xiàn)網(wǎng)絡(luò)抓包功能,如下一個(gè)演示案例我們分別通過(guò)sniff(count=2)
函數(shù)抓取兩個(gè)數(shù)據(jù)包并使用wrpcap()
函數(shù)將其保存到文件內(nèi),當(dāng)需要分析時(shí)可通過(guò)調(diào)用rdpcap()
函數(shù)打開(kāi)數(shù)據(jù)包即可實(shí)現(xiàn)分析。
>>> from scapy.all import * >>> >>> packets = sniff(count=2) # 動(dòng)態(tài)抓取2個(gè)數(shù)據(jù)包 >>> >>> wrpcap("d://lyshark.pcap",packets) # 保存數(shù)據(jù)包 >>> pcap_packets = rdpcap("d://lyshark.pcap") # 讀取數(shù)據(jù)包 >>> >>> pcap_packets <lyshark.pcap: TCP:2 UDP:0 ICMP:0 Other:0> >>> >>> pcap_packets.show() 0000 Ether / IP / TCP 192.168.1.101:63995 > 172.217.24.10:https S 0001 Ether / IP / TCP 192.168.1.101:63907 > 103.235.46.191:https A / Raw >>> >>> pcap_packets.summary() Ether / IP / TCP 192.168.1.101:63995 > 172.217.24.10:https S Ether / IP / TCP 192.168.1.101:63907 > 103.235.46.191:https A / Raw >>> >>> pcap_packets[0].dst 'FF:2d:1e:0f:1e:a1' >>> >>> pcap_packets[0].src 'a4:7e:33:ee:cc:b3' >>> # 如下分別代表: 鏈路層 [Ethernet]、網(wǎng)絡(luò)層[IP]、傳輸層[TCP/UDP]、應(yīng)用層[RAW] >>> pcap_packets[0].show() >>> # 抓包后直接輸出 >>> sniff(prn=lambda x: x.show(), count=1)
通過(guò)上方的抓包流程讀者即可實(shí)現(xiàn)簡(jiǎn)單的抓包功能,當(dāng)然sniff
函數(shù)參數(shù)眾多我們完全可以在抓包時(shí)增加不同的抓包條件,同時(shí)該函數(shù)也支持回調(diào)函數(shù),當(dāng)由新的請(qǐng)求被觸發(fā)時(shí)則自動(dòng)執(zhí)行回調(diào)函數(shù),如下則是使用Scapy
抓包的完整案例,該案例展示了抓取60
秒數(shù)據(jù)包,并將其保存至d://lyshark.pcap
目錄。
from scapy.all import * import scapy.all as scapy # 數(shù)據(jù)包回調(diào)函數(shù) def packet_callback(packet): if packet[TCP].payload: m_packet = str(packet[TCP].payload) print("主機(jī)地址: {} ---> 數(shù)據(jù)包內(nèi)容: {}".format(packet[IP].dst,packet[TCP].payload)) if __name__ == "__main__": # 抓取80端口的數(shù)據(jù)包并輸出到屏幕 # sniff(filter="tcp port 80", prn=packet_callback, store=0) # 抓取 過(guò)濾出tcp協(xié)議 抓取1分鐘后保存到文件中 package=sniff(filter="tcp", timeout=60, prn=packet_callback, store=1) wrpcap("d://lyshark.pcap", package)
運(yùn)行上方抓包程序,讀者可看到如下圖所示的輸出結(jié)果,等待60
秒后即可看到d://lyshark.pcap
文件。
當(dāng)讀者抓取到這些數(shù)據(jù)包之后,下一步則是解析這些數(shù)據(jù)包,解析的方法有許多可以使用DPKT
解析,也可以使用scapy
自帶的工具解析,本章首先介紹如何使用Scapy
工具實(shí)現(xiàn)解析數(shù)據(jù)包內(nèi)的HTTP
請(qǐng)求,并輸出的功能,如下是完整的代碼實(shí)現(xiàn);
from scapy.all import * import scapy.all as scapy # 解析獲取到的數(shù)據(jù)包 def get_http_pcap(pcap_path): pcap_infos = list() packets = scapy.rdpcap(pcap_path) for p in packets: if p.haslayer("IP"): src_ip = p["IP"].src dst_ip = p["IP"].dst if p.haslayer("TCP"): raw_http = p["TCP"].payload.original sport = p["TCP"].sport dport = p["TCP"].dport if p.haslayer("HTTPRequest"): host = p["HTTPRequest"].Host uri = p["HTTPRequest"].Path http_fields = p["HTTPRequest"].fields # print("主機(jī)地址: {} --> URI: {}".format(host,uri)) print("原IP地址: {}:{} --> 目標(biāo)IP地址: {}:{}".format(src_ip,sport,dst_ip,dport)) if __name__ == "__main__": get_http_pcap("d://lyshark.pcap")
讀者可自行運(yùn)行上述代碼,并傳入剛才抓取到的lyshark.pcap
數(shù)據(jù)包,此時(shí)則可解析出當(dāng)前數(shù)據(jù)包中所有HTTP訪問(wèn)數(shù)據(jù),如下圖所示;
對(duì)于數(shù)據(jù)包的解包功能,Dpkt
工具包也可以很好的完成,對(duì)于使用Dpkt
解包而言,首先需要通過(guò)open()
打開(kāi)數(shù)據(jù)包,接著調(diào)用dpkt.pcap.Reader(fp)
將文件內(nèi)的字節(jié)轉(zhuǎn)化為PCAP格式,最后調(diào)用自定義函數(shù)GetDpkt
根據(jù)字段進(jìn)行解析即可。
import dpkt import socket def GetDpkt(pcap): for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data tcp = ip.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) sport = tcp.sport dport = tcp.dport print("[+] 源地址: {}:{} --> 目標(biāo)地址:{}:{}".format(src,sport,dst,dport)) except Exception: pass # 檢測(cè)主機(jī)是否被DDOS攻擊了 def FindDDosAttack(pcap): pktCount = {} for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data tcp = ip.data src = socket.inet_ntoa(ip.src) dst = socket.inet_ntoa(ip.dst) sport = tcp.sport # 累計(jì)判斷各個(gè)src地址對(duì)目標(biāo)地址80端口訪問(wèn)次數(shù) if dport == 80: stream = src + ":" + dst if pktCount.has_key(stream): pktCount[stream] = pktCount[stream] + 1 else: pktCount[stream] = 1 except Exception: pass for stream in pktCount: pktSent = pktCount[stream] # 如果超過(guò)設(shè)置的檢測(cè)閾值500,則判斷為DDOS攻擊行為 if pktSent > 500: src = stream.split(":")[0] dst = stream.split(":")[1] print("[+] 源地址: {} 攻擊: {} 流量: {} pkts.".format(src,dst,str(pktSent))) # FindPcapURL 監(jiān)控提取數(shù)據(jù)包中的所有URL def FindPcapURL(pcap): Url = [] for timestamp,packet in pcap: try: eth = dpkt.ethernet.Ethernet(packet) ip = eth.data src = socket.inet_ntoa(ip.src) tcp = ip.data http = dpkt.http.Request(tcp.data) if(http.method == "GET"): UrlHead = http.headers for key,value in UrlHead.items(): url = re.findall('^https*://.*',str(value)) if url: print("[+] 源地址: %10s --> 訪問(wèn)URL: %-80s"%(src, url[0])) except Exception: pass return set(Url) # 動(dòng)態(tài)保存pcap文件(每1024字節(jié)保存一次pcap文件),并讀取出其中的網(wǎng)址解析出來(lái) def write_cap(pkt): global pkts global count pkts.append(pkt) count += 1 if count == 1024: wrpcap("data.pcap",pkts) fp = open("./data.pcap","rb") pcap = dpkt.pcap.Reader(fp) FindPcapURL(pcap) fp.close() pkts,count = [],0 if __name__ == "__main__": fp = open("d://lyshark.pcap","rb") pcap = dpkt.pcap.Reader(fp) GetDpkt(pcap)
運(yùn)行上述代碼,同樣可以輸出這些IP信息,如下圖所示;
到此這篇關(guān)于Python使用DPKT實(shí)現(xiàn)分析數(shù)據(jù)包 的文章就介紹到這了,更多相關(guān)Python DPKT內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pymysql 插入數(shù)據(jù) 轉(zhuǎn)義處理方式
今天小編就為大家分享一篇pymysql 插入數(shù)據(jù) 轉(zhuǎn)義處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03python使用wxpy輕松實(shí)現(xiàn)微信防撤回的方法
今天小編就為大家分享一篇python使用wxpy輕松實(shí)現(xiàn)微信防撤回的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02自然語(yǔ)言處理NLP TextRNN實(shí)現(xiàn)情感分類
這篇文章主要為大家介紹了自然語(yǔ)言處理NLP TextRNN實(shí)現(xiàn)情感分類示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04Python中條件語(yǔ)句、循環(huán)語(yǔ)句和pass語(yǔ)句的使用示例
Python條件語(yǔ)句是通過(guò)一條或多條語(yǔ)句的執(zhí)行結(jié)果(True或者False)來(lái)決定執(zhí)行的代碼塊,下面這篇文章主要給大家介紹了關(guān)于Python中條件語(yǔ)句、循環(huán)語(yǔ)句和pass語(yǔ)句使用的相關(guān)資料,需要的朋友可以參考下2022-06-06對(duì)python中的float除法和整除法的實(shí)例詳解
今天小編就為大家分享一篇對(duì)python中的float除法和整除法的實(shí)例詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07Python使用sklearn實(shí)現(xiàn)的各種回歸算法示例
這篇文章主要介紹了Python使用sklearn實(shí)現(xiàn)的各種回歸算法,結(jié)合實(shí)例形式分析了Python使用sklearn庫(kù)實(shí)現(xiàn)的決策樹(shù)回歸、線性回歸、SVM回歸、KNN回歸、隨機(jī)森林回歸等各種回歸算法,需要的朋友可以參考下2019-07-07Python Shiny庫(kù)創(chuàng)建交互式Web應(yīng)用及高級(jí)功能案例
Shiny是一個(gè)基于Python的交互式Web應(yīng)用框架,專注于簡(jiǎn)化Web應(yīng)用的開(kāi)發(fā)流程,本文將深入探討Shiny庫(kù)的基本用法、高級(jí)功能以及實(shí)際應(yīng)用案例,以幫助開(kāi)發(fā)者充分發(fā)揮Shiny在Web應(yīng)用開(kāi)發(fā)中的優(yōu)勢(shì)2023-12-12django項(xiàng)目簡(jiǎn)單調(diào)取百度翻譯接口的方法
這篇文章主要介紹了django項(xiàng)目簡(jiǎn)單調(diào)取百度翻譯接口的方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值 ,需要的朋友可以參考下2019-08-08python中pandas.DataFrame對(duì)行與列求和及添加新行與列示例
pandas是python環(huán)境下最有名的數(shù)據(jù)統(tǒng)計(jì)包,而DataFrame翻譯為數(shù)據(jù)框,是一種數(shù)據(jù)組織方式,這篇文章主要給大家介紹了python中pandas.DataFrame對(duì)行與列求和及添加新行與列的方法,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-03-03