使用Python實(shí)現(xiàn)WebSocket服務(wù)器與客戶端通信功能
簡介
WebSocket 是一種基于 TCP 協(xié)議的通信協(xié)議,能夠在客戶端與服務(wù)器之間進(jìn)行全雙工(雙向)通信。相比傳統(tǒng)的 HTTP 協(xié)議,WebSocket 可以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)的傳輸,尤其適合需要實(shí)時(shí)交互的應(yīng)用場景,如在線游戲、實(shí)時(shí)聊天、金融交易等。
我通過 Python 實(shí)現(xiàn)一個(gè)簡單的 WebSocket 服務(wù)器,并使其與客戶端進(jìn)行通信。我們將創(chuàng)建兩個(gè) Python 文件:websocket.py
和 main.py
,websocket.py
負(fù)責(zé)實(shí)現(xiàn) WebSocket 服務(wù)器的功能,main.py
負(fù)責(zé)啟動(dòng)和管理服務(wù)器,以及定時(shí)向客戶端發(fā)送消息。
安裝 WebSocket 庫
在開始之前,需要安裝 websockets
庫,它是 Python 中非常流行的 WebSocket 實(shí)現(xiàn)。
pip install websockets
websocket.py — WebSocket 服務(wù)器實(shí)現(xiàn)
在 websocket.py
文件中,我定義了一個(gè) WebSocketServer
類,包含了 WebSocket 服務(wù)器的主要邏輯。
代碼解析
import asyncio import websockets class WebSocketServer: def __init__(self, host="localhost", port=8765): self.host = host self.port = port self.clients = set() # 存儲(chǔ)所有連接的客戶端
首先,我創(chuàng)建了一個(gè) WebSocketServer
類,它的構(gòu)造方法初始化了服務(wù)器的主機(jī)地址 host
和端口號(hào) port
,同時(shí)維護(hù)了一個(gè)客戶端集合 clients
來存儲(chǔ)當(dāng)前連接的 WebSocket 客戶端。
async def handle_client(self, websocket): # 新的客戶端連接 self.clients.add(websocket) try: async for message in websocket: print(f"收到消息: {message}") # 回顯消息給客戶端 await websocket.send(f"服務(wù)器已收到: {message}") except websockets.ConnectionClosed as e: print(f"客戶端斷開連接: {e}") finally: # 移除斷開的客戶端 self.clients.remove(websocket)
handle_client
是一個(gè)異步方法,用來處理與客戶端的連接。在客戶端發(fā)送消息時(shí),服務(wù)器會(huì)接收到該消息,并通過 websocket.send()
方法將回顯消息發(fā)送給客戶端。如果客戶端斷開連接,捕獲到 ConnectionClosed
異常,并從 clients
集合中移除該客戶端。
async def send(self, message): """向所有連接的客戶端發(fā)送消息""" if not self.clients: print("沒有客戶端連接,無法發(fā)送消息") return disconnected_clients = set() for client in self.clients: try: await client.send(message) except websockets.ConnectionClosed: disconnected_clients.add(client) # 清理已斷開的客戶端 self.clients -= disconnected_clients
send
方法允許服務(wù)器向所有連接的客戶端發(fā)送消息。如果有客戶端斷開連接,服務(wù)器會(huì)將其從 clients
集合中移除。
async def start(self): print(f"啟動(dòng) WebSocket 服務(wù)器: ws://{self.host}:{self.port}") async with websockets.serve(self.handle_client, self.host, self.port): await asyncio.Future() # 持續(xù)運(yùn)行直到手動(dòng)停止
start
方法啟動(dòng) WebSocket 服務(wù)器,通過 websockets.serve()
啟動(dòng)一個(gè) WebSocket 服務(wù),該服務(wù)會(huì)監(jiān)聽來自客戶端的連接請(qǐng)求,并調(diào)用 handle_client
方法處理這些請(qǐng)求。await asyncio.Future()
會(huì)讓服務(wù)器持續(xù)運(yùn)行,直到手動(dòng)停止。
main.py — 啟動(dòng) WebSocket 服務(wù)器并定時(shí)發(fā)送消息
在 main.py
文件中,創(chuàng)建了一個(gè) WebSocket 服務(wù)器的實(shí)例,并啟動(dòng)了一個(gè)定時(shí)任務(wù),定期向連接的客戶端發(fā)送消息。
代碼解析
import asyncio from websocket import WebSocketServer async def main(): # 創(chuàng)建 WebSocket 服務(wù)器實(shí)例 websocket_server = WebSocketServer() websocket_task = asyncio.create_task(websocket_server.start()) # 啟動(dòng) WebSocket 服務(wù)器
在 main()
函數(shù)中,首先創(chuàng)建了 WebSocketServer
的實(shí)例,并啟動(dòng)了 WebSocket 服務(wù)器。
# 定時(shí)向 WebSocket 客戶端發(fā)送數(shù)據(jù) async def send_data_periodically(): while True: await asyncio.sleep(5) # 每 5 秒發(fā)送一次數(shù)據(jù) await websocket_server.send("服務(wù)器定時(shí)發(fā)送消息:ping")
send_data_periodically
是一個(gè)定時(shí)任務(wù),每隔 5 秒就向所有連接的客戶端發(fā)送一次 "服務(wù)器定時(shí)發(fā)送消息:ping"
。
periodic_task = asyncio.create_task(send_data_periodically()) # 并發(fā)運(yùn)行所有任務(wù) await asyncio.gather(websocket_task, periodic_task)
通過 asyncio.create_task()
啟動(dòng)了定時(shí)任務(wù),并通過 asyncio.gather()
并發(fā)運(yùn)行 WebSocket 服務(wù)器和定時(shí)發(fā)送消息的任務(wù)。
if __name__ == "__main__": asyncio.run(main())
最后,我們 asyncio.run(main())
啟動(dòng)了整個(gè)程序。
總結(jié)
通過這兩個(gè)文件,實(shí)現(xiàn)了一個(gè)簡單的 WebSocket 服務(wù)器,該服務(wù)器能夠接收客戶端消息并進(jìn)行回顯。同時(shí),服務(wù)器也能夠定時(shí)向所有連接的客戶端發(fā)送消息。
完整代碼
websocket.py
import asyncio import websockets class WebSocketServer: def __init__(self, host="localhost", port=8765): self.host = host self.port = port self.clients = set() # 存儲(chǔ)所有連接的客戶端 async def handle_client(self, websocket): # 新的客戶端連接 self.clients.add(websocket) try: async for message in websocket: print(f"收到消息: {message}") # 回顯消息給客戶端 await websocket.send(f"服務(wù)器已收到: {message}") except websockets.ConnectionClosed as e: print(f"客戶端斷開連接: {e}") finally: # 移除斷開的客戶端 self.clients.remove(websocket) async def send(self, message): """向所有連接的客戶端發(fā)送消息""" if not self.clients: print("沒有客戶端連接,無法發(fā)送消息") return disconnected_clients = set() for client in self.clients: try: await client.send(message) except websockets.ConnectionClosed: disconnected_clients.add(client) # 清理已斷開的客戶端 self.clients -= disconnected_clients async def start(self): print(f"啟動(dòng) WebSocket 服務(wù)器: ws://{self.host}:{self.port}") async with websockets.serve(self.handle_client, self.host, self.port): await asyncio.Future() # 持續(xù)運(yùn)行直到手動(dòng)停止
main.py
import asyncio from websocket import WebSocketServer async def main(): # 創(chuàng)建 WebSocket 服務(wù)器實(shí)例 websocket_server = WebSocketServer() websocket_task = asyncio.create_task(websocket_server.start()) # 啟動(dòng) WebSocket 服務(wù)器 # 定時(shí)向 WebSocket 客戶端發(fā)送數(shù)據(jù) async def send_data_periodically(): while True: await asyncio.sleep(5) # 每 5 秒發(fā)送一次數(shù)據(jù) await websocket_server.send("服務(wù)器定時(shí)發(fā)送消息:ping") periodic_task = asyncio.create_task(send_data_periodically()) # 并發(fā)運(yùn)行所有任務(wù) await asyncio.gather(websocket_task, periodic_task) if __name__ == "__main__": asyncio.run(main())
以上就是使用Python實(shí)現(xiàn)WebSocket服務(wù)器與客戶端通信的詳細(xì)內(nèi)容,更多關(guān)于Python WebSocket通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實(shí)現(xiàn)自動(dòng)化處理PDF文件的方法詳解
這篇文章主要為大家詳細(xì)介紹了如何使用Python完成簡單的PDF文件處理操作,如PDF文件的批量合并、拆分、加密以及添加水印等,需要的可以參考一下2022-09-09pandas時(shí)間序列之pd.to_datetime()的實(shí)現(xiàn)
本文主要介紹了pandas時(shí)間序列之pd.to_datetime()的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧<BR>2022-06-06Python使用urllib模塊的urlopen超時(shí)問題解決方法
這篇文章主要介紹了Python使用urllib模塊的urlopen超時(shí)問題解決方法,本文使用socket模塊中的setdefaulttimeout函數(shù)解決了超時(shí)問題,需要的朋友可以參考下2014-11-11詳解win10下pytorch-gpu安裝以及CUDA詳細(xì)安裝過程
這篇文章主要介紹了win10下pytorch-gpu安裝以及CUDA詳細(xì)安裝過程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01PyTorch中的squeeze()和unsqueeze()解析與應(yīng)用案例
這篇文章主要介紹了PyTorch中的squeeze()和unsqueeze()解析與應(yīng)用案例,文章內(nèi)容介紹詳細(xì),需要的小伙伴可以參考一下,希望對(duì)你有所幫助2022-03-03