基于Python的socket庫實現(xiàn)通信功能的示例代碼
1.前言
這是一篇介紹使用python的socket庫實現(xiàn)通信功能的文章.系統(tǒng)主要分為客戶端和服務(wù)端。這里簡單的給每個客戶端增加一個不重復(fù)的uid,客戶端之間可以根據(jù)這個uid選擇進行廣播通信,即與所有在線的客戶端通信。也可以選擇與單一客戶端通信。
2.技術(shù)介紹
(1) socket:
Socket(套接字)是在應(yīng)用層和傳輸層之間的一個抽象層,它提供了對TCP/IP協(xié)議族進行訪問的接口。socket可以看成是一種特殊的文件描述符,與文件描述符類似,開發(fā)者可以對它進行讀寫操作,從而實現(xiàn)與遠程主機的通信和數(shù)據(jù)傳輸。
在網(wǎng)絡(luò)編程中,Socket通常被用于實現(xiàn)客戶端和服務(wù)器之間的通信。當一個程序需要向另一個程序發(fā)送數(shù)據(jù)時,它會將數(shù)據(jù)包裝在一個Socket對象中,然后通過網(wǎng)絡(luò)傳輸給對方。接收端程序收到數(shù)據(jù)后,會再次將數(shù)據(jù)解包,存入自己的Socket緩存當中。
在TCP/IP協(xié)議中,Socket是通過IP地址、端口號和協(xié)議類型來標識的。其中,IP地址用來唯一標識一臺主機,端口號則表示該主機上具體的網(wǎng)絡(luò)應(yīng)用程序,協(xié)議類型指明該Socket使用的具體傳輸協(xié)議,如TCP或UDP等。
Socket編程比較靈活,可以根據(jù)不同的需求進行不同的設(shè)置和配置,例如設(shè)置Socket的超時時間、緩沖區(qū)大小、數(shù)據(jù)包大小等。同時,Socket也支持多種編程語言,如Python、Java和C/C++等。因此,在實現(xiàn)網(wǎng)絡(luò)應(yīng)用程序時,Socket是非常常用和重要的組件之一。
(2) Python的socket庫:
Python的socket庫是一個用于網(wǎng)絡(luò)編程的標準庫,它提供了一套高級的網(wǎng)絡(luò)接口,使得開發(fā)人員可以輕松地創(chuàng)建網(wǎng)絡(luò)應(yīng)用程序。
該庫允許Python程序通過創(chuàng)建Socket對象來完成網(wǎng)絡(luò)通信,每個Socket對象都與一個特定的協(xié)議族相關(guān)聯(lián),如AF_INET(使用IPv4協(xié)議)或AF_INET6(使用IPv6協(xié)議)。
socket庫提供了一系列函數(shù),可以用于創(chuàng)建、連接、監(jiān)聽和發(fā)送數(shù)據(jù)等各種網(wǎng)絡(luò)操作。其中,一些常用的函數(shù)包括:
- socket() : 創(chuàng)建一個Socket對象,指定協(xié)議族、sock類型、協(xié)議等參數(shù);
- bind() : 將Socket對象與本地地址進行綁定;
- listen() : 開始監(jiān)聽指定端口上的連接請求;
- accept() : 接受客戶端連接請求,并返回新的Socket對象和客戶端地址;
- connect() : 連接到遠端主機的指定端口;
- recv() 和 send() : 接收和發(fā)送數(shù)據(jù);
- close() : 關(guān)閉Socket連接。
使用Python的socket庫,可以方便地實現(xiàn)基于TCP或UDP協(xié)議的網(wǎng)絡(luò)編程。同時,socket庫也支持基于多線程和異步IO的網(wǎng)絡(luò)編程,可以用于處理高并發(fā)的網(wǎng)絡(luò)應(yīng)用程序。
3.系統(tǒng)實現(xiàn)
(1) 服務(wù)端(server.py)
import socket import threading import json HOST = '127.0.0.1' PORT = 8888 # 存儲客戶端的 uid 和 socket 對象的映射關(guān)系 clients = {} # 創(chuàng)建一個 TCP 協(xié)議的套接字對象 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 綁定并監(jiān)聽一個指定的地址和端口號 server_socket.bind((HOST, PORT)) # 開始監(jiān)聽客戶端對指定 IP 地址和端口號的連接請求,當有客戶端發(fā)送連接請求時, # 服務(wù)端可以接受客戶端連接,并創(chuàng)建一個新的套接字對象以處理客戶端與服務(wù)端之間的通信。 server_socket.listen() def handle_client(client_socket, client_address): uid = client_socket.recv(1024).decode('utf-8') print('uid:{}已上線'.format(uid)) clients[uid] = client_socket while True: try: data = client_socket.recv(1024) if data: # 解析消息 msg_dict = json.loads(data.decode('utf-8')) sender_id = msg_dict['sender_id'] receiver_id = msg_dict['receiver_id'] message = msg_dict['message'] print('用戶 {} 發(fā)送給用戶 {} 的消息: {}'.format(sender_id, receiver_id, message)) if receiver_id == 'all': # 廣播消息給所有客戶端 for uid, client in clients.items(): if uid != sender_id: client.sendall(data) elif receiver_id in clients and receiver_id != sender_id: # 發(fā)送消息給指定客戶端 client_socket = clients[receiver_id] client_socket.sendall(data) else: # 接收到的消息有誤,回復(fù)錯誤信息 error = { '錯誤': f'對方ID無效: {receiver_id}' } client_socket.sendall(json.dumps(error).encode('utf-8')) else: # 客戶端關(guān)閉了連接 print('客戶端關(guān)閉了連接') break except: # 發(fā)送和接收消息時出現(xiàn)異常,可能是客戶端關(guān)閉了連接 print('發(fā)送和接收消息時出現(xiàn)異常,可能是客戶端關(guān)閉了連接') break # 移除客戶端連接 print('用戶:{}已離線'.format(uid)) del clients[uid] # 關(guān)閉客戶端連接 client_socket.close() if __name__ == '__main__': print(f'Server listening on {HOST}:{PORT}') while True: client_socket, client_address = server_socket.accept() print(f'{client_address[0]}:{client_address[1]} connected') # 生成客戶端 UID,并存儲客戶端的 socket 對象和 UID 的映射關(guān)系 thread = threading.Thread(target=handle_client, args=(client_socket, client_address)) thread.daemon = True thread.start()
(2) 客戶端1(client1.py)
import socket import threading import uuid import json HOST = '127.0.0.1' PORT = 8888 # 隨機生成一個唯一 ID UID = str(uuid.uuid4()) # 創(chuàng)建一個 TCP 協(xié)議的套接字對象 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接指定主機和端口 client_socket.connect((HOST, PORT)) # 將UID發(fā)送給服務(wù)端 client_socket.sendall(UID.encode('utf-8')) def receive_message(): while True: data = client_socket.recv(1024) if data: # 解析消息 msg_dict = json.loads(data.decode('utf-8')) if 'sender_id' in msg_dict: sender_id = msg_dict['sender_id'] message = msg_dict['message'] print(f"{sender_id}: {message}") elif 'error' in msg_dict: error = msg_dict['error'] print(f"Error: {error}") else: # 服務(wù)端關(guān)閉了連接 print('服務(wù)器已下線...') break # 關(guān)閉客戶端連接 client_socket.close() def send_message(receiver_id): while True: message = input() # 構(gòu)造消息字典 msg_dict = { 'sender_id': UID, 'receiver_id': receiver_id, 'message': message } # 將消息字典序列化為JSON格式字符串 json_str = json.dumps(msg_dict) # 將消息發(fā)送給服務(wù)端 client_socket.sendall(json_str.encode('utf-8')) if __name__ == '__main__': print(f'你的ID: {UID}') receiver_id = input('輸入對方的ID接收對方消息或者輸入"all"接收廣播消息: ') # 啟動接收消息的線程 t1 = threading.Thread(target=receive_message) t1.daemon = True t1.start() # 啟動發(fā)送消息的線程 t2 = threading.Thread(target=send_message, args=(receiver_id,)) t2.daemon = True t2.start() # 等待發(fā)送線程結(jié)束,結(jié)束接收線程 t2.join() t1.join()
(3) 客戶端2(client2.py)
import socket import threading import uuid import json HOST = '127.0.0.1' PORT = 8888 # 隨機生成一個唯一 ID UID = str(uuid.uuid4()) # 創(chuàng)建一個 TCP 協(xié)議的套接字對象 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接指定主機和端口 client_socket.connect((HOST, PORT)) # 將UID發(fā)送給服務(wù)端 client_socket.sendall(UID.encode('utf-8')) def receive_message(): while True: data = client_socket.recv(1024) if data: # 解析消息 msg_dict = json.loads(data.decode('utf-8')) if 'sender_id' in msg_dict: sender_id = msg_dict['sender_id'] message = msg_dict['message'] print(f"{sender_id}: {message}") elif 'error' in msg_dict: error = msg_dict['error'] print(f"Error: {error}") else: # 服務(wù)端關(guān)閉了連接 print('服務(wù)器已下線...') break # 關(guān)閉客戶端連接 client_socket.close() def send_message(receiver_id): while True: message = input() # 構(gòu)造消息字典 msg_dict = { 'sender_id': UID, 'receiver_id': receiver_id, 'message': message } # 將消息字典序列化為JSON格式字符串 json_str = json.dumps(msg_dict) # 將消息發(fā)送給服務(wù)端 client_socket.sendall(json_str.encode('utf-8')) if __name__ == '__main__': print(f'你的ID: {UID}') receiver_id = input('輸入對方的ID接收對方消息或者輸入"all"接收廣播消息: ') # 啟動接收消息的線程 t1 = threading.Thread(target=receive_message) t1.daemon = True t1.start() # 啟動發(fā)送消息的線程 t2 = threading.Thread(target=send_message, args=(receiver_id,)) t2.daemon = True t2.start() # 等待發(fā)送線程結(jié)束,結(jié)束接收線程 t2.join() t1.join()
4.系統(tǒng)功能演示
(1)啟動服務(wù)端和客戶端
注意事項:需要需要先啟動服務(wù)端
(2) 通信功能
client1輸入clent2的ID,在client2里輸入clent1的ID,然后就可以開始通信了!
5.總結(jié)
這里提供了一個簡單的實現(xiàn)使用python的socket庫實現(xiàn)網(wǎng)絡(luò)通信功能。
以上就是基于Python的socket庫實現(xiàn)通信功能的示例代碼的詳細內(nèi)容,更多關(guān)于Python socket庫通信功能的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python web框架Flask實現(xiàn)圖形驗證碼及驗證碼的動態(tài)刷新實例
在本篇文章里小編給大家整理的是關(guān)于python web框架Flask實現(xiàn)圖形驗證碼的相關(guān)知識點,有需要的朋友們參考下。2019-10-10Python如何聲明以管理員方式運行(附實戰(zhàn)案例)
由于Windows的安全機制,Python寫的腳本缺少了管理員權(quán)限,運行就會受到一些限制,這篇文章主要介紹了Python如何聲明以管理員方式運行的相關(guān)資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2025-04-04一個基于flask的web應(yīng)用誕生 用戶注冊功能開發(fā)(5)
一個基于flask的web應(yīng)用誕生第五篇,這篇文章主要介紹了用戶注冊功能開發(fā),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04python消費kafka數(shù)據(jù)批量插入到es的方法
今天小編就為大家分享一篇python消費kafka數(shù)據(jù)批量插入到es的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12