python 自動(dòng)識別并連接串口的實(shí)現(xiàn)
這個(gè)屬于我項(xiàng)目中一個(gè)函數(shù),跟大家分享一下我的思路及最終實(shí)現(xiàn)
在編寫串口通信工具中,需要實(shí)現(xiàn)一個(gè)函數(shù),自動(dòng)找到對應(yīng)com 口,并且連接該com口,保證后續(xù)通信正常
作為初始化過程的一部分。
思路
在win 設(shè)備管理器中,經(jīng)常會(huì)出現(xiàn)多個(gè)com 口,但并不是每個(gè)com 口都是目標(biāo)設(shè)備所鏈接的。
嘗試打開每個(gè)com 口,輸入enter 按鍵, 正確的com 口,會(huì)有ack log 返回,表明通信 正常
否則,沒有任何log 返回,則判斷為非目標(biāo)設(shè)備所連接的com 口。
實(shí)現(xiàn)

嘗試去打開所有com 口,然后發(fā)送enter, 如果在一段時(shí)間內(nèi)有返回值,檢查com 口收到的字節(jié)數(shù),如果非零,則表明找到了對應(yīng)的com 口。
完整測試代碼如下:
import serial
import serial.tools.list_ports
import threading
import binascii
import time
from datetime import datetime
# default value
baunRate = 115200
is_read = False
is_write = False
write_buff = []
sys_buff = []
mSerial = None
callback = None
is_opened = 0
is_registed = 0
class SerialPort:
def __init__(self,port,buand):
self.port = serial.Serial(port,buand)
self.port.close()
if not self.port.isOpen():
self.port.open()
#the index of data_bytes for read operation,私有屬性
#only used in read lines
self.__read_ptr = 0
self.__read_head = 0
#store all read bytes
# used in read date, read lines
self.__data_bytes = bytearray()
def port_open(self):
if not self.port.isOpen():
self.port.open()
def port_close(self):
self.port.close()
def send(self):
global is_write
global write_buff
while is_write:
if len(write_buff):
msg = write_buff.pop(0)
msg = msg+"\n"
cmd = msg.encode()
try:
self.port.write(cmd)
except:
write_buff.clear()
is_write = False
write_buff.clear()
def read_data(self):
global is_read
global is_opened
byte_cnt = 0
while is_read:
try:
count = self.port.inWaiting()
if count > 0:
rec_str = self.port.read(count)
self.__data_bytes = self.__data_bytes+rec_str
#print("receive:",rec_str.decode())
#print(rec_str)
byte_cnt += count
if not is_opened:
is_opened = 1
#print("累計(jì)收到:",byte_cnt)
#time.sleep(0.5)
self.read_lines()
except:
deinit()
#將當(dāng)前所有的數(shù)據(jù)都讀出,讀取位置不變,每次讀取指針依次移動(dòng),不漏數(shù)據(jù), 讀取行為一直在進(jìn)行
def read_lines(self):
#reset
line_cnt = 0
data_len = len(self.__data_bytes)
#print ("")
#print ("begin: prt=:", self.__read_ptr, " head =", self.__read_head,"current len =",data_len)
if self.__read_ptr >=data_len:
return
#get all lines in current data_bytes
while(self.__read_ptr < data_len-1):
if(self.__data_bytes[self.__read_ptr+1] == 0x0a and self.__data_bytes[self.__read_ptr] == 0x0d):
tmp = bytearray()
tmp = self.__data_bytes[self.__read_head:self.__read_ptr]
try:
line = tmp.decode()
except:
self.__read_head = self.__read_ptr + 2
self.__read_ptr = self.__read_head
continue
iprint(line)
line_cnt += 1
self.__read_head = self.__read_ptr + 2
self.__read_ptr = self.__read_head
else:
self.__read_ptr = self.__read_ptr + 1
def auto_open_serial():
global baunRate
global mSerial
global callback
global is_registed
global is_opened
#reset
deinit()
# 列出所有當(dāng)前的com口
port_list = list(serial.tools.list_ports.comports())
port_list_name = []
#get all com
if len(port_list) <= 0:
iprint("the serial port can't find!")
return False
else:
for itms in port_list:
port_list_name.append(itms.device)
#try open
#print(port_list_name)
for i in port_list_name:
try:
mSerial = SerialPort(i,baunRate)
iprint("try open %s"%i)
start_task()
send("")
#return True
time.sleep(1)
if is_opened:
iprint("connect %s successfully"%i)
return True
else:
deinit()
if i == port_list_name[len(port_list_name)-1]:
iprint("uart don't open")
break
continue
except:
iprint(" uart don't open")
deinit()
return False
def deinit():
global mSerial
global is_write
global is_read
global write_buff
global is_opened
if mSerial:
mSerial.port_close()
is_opened = 0
is_read = False
is_write = False
write_buff = []
mSerial = None
time.sleep(1)
def init():
global mSerial
global callback
global is_registed
global is_opened
global is_read
global is_write
#retry
retry_time = 0
while not auto_open_serial():
if not is_opened:
iprint("wait for uart connect, retry %s"%str(retry_time))
else:
return True
retry_time += 1
time.sleep(2)
if retry_time == 10:
iprint(" open uart fail")
return False
def send(msg):
global mSerial
global is_write
global write_buff
if is_write:
write_buff.append(msg)
def start_task():
global mSerial
global is_write
global is_read
if mSerial:
is_write = True
t1 = threading.Thread (target=mSerial.send)
t1.setDaemon (False)
t1.start ()
is_read = True
t2 = threading.Thread (target=mSerial.read_data)
t2.setDaemon (False)
t2.start ()
def iprint(msg):
global callback
global is_registed
msg = "[Uart] "+str(msg)
if is_registed:
callback.append(msg)
else:
print(msg)
def start_sys_cmd():
global is_registed
if is_registed:
t3 = threading.Thread (target=process_receive_sys_cmd)
t3.setDaemon (False)
t3.start()
def process_receive_sys_cmd():
global sys_buff
global is_registed
global callback
#print("process_receive_sys_cmd")
while is_registed:
#print ("wait,process_receive_sys_cmd")
if len(sys_buff):
#print ("receive,process_receive_sys_cmd")
line = sys_buff.pop(0)
if "init" in line:
if is_opened and is_read and is_write:
iprint("already open uart")
break
iprint("start init")
init()
if is_opened:
break
iprint("Eixt uart sys thread")
def register_cback(list):
global callback
global is_registed
callback = list
is_registed = 1
def unregister_cback():
global callback
callback.clear()
if __name__ == '__main__':
receive = []
register_cback(receive)
sys_buff.append("init")
start_sys_cmd()
def process_receive_msg():
global receive
while True:
#print("wait")
if len(receive):
#print("receive")
print(receive.pop(0))
t = threading.Thread(target=process_receive_msg)
t.setDaemon(False)
t.start()
到此這篇關(guān)于python 自動(dòng)識別并連接串口的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)python 自動(dòng)識別并連接串口內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python基于pygame實(shí)現(xiàn)單機(jī)版五子棋對戰(zhàn)
這篇文章主要為大家詳細(xì)介紹了Python基于pygame實(shí)現(xiàn)單機(jī)版五子棋對戰(zhàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-12-12
Python視頻爬蟲實(shí)現(xiàn)下載頭條視頻功能示例
這篇文章主要介紹了Python視頻爬蟲實(shí)現(xiàn)下載頭條視頻功能,涉及Python正則匹配、網(wǎng)絡(luò)傳輸及文件讀寫等相關(guān)操作技巧,需要的朋友可以參考下2018-05-05
Python集成C#實(shí)現(xiàn)界面操作下載文件功能的全過程
使用腳本進(jìn)行下載的需求很常見,下面這篇文章主要給大家介紹了關(guān)于Python集成C#實(shí)現(xiàn)界面操作下載文件功能的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03
python 從csv讀數(shù)據(jù)到mysql的實(shí)例
今天小編就為大家分享一篇python 從csv讀數(shù)據(jù)到mysql的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06
在Python的Django框架中編寫錯(cuò)誤提示頁面
這篇文章主要介紹了在Python的Django框架中編寫錯(cuò)誤提示頁面,包括傳統(tǒng)的404頁面和設(shè)置連接中斷警告等,需要的朋友可以參考下2015-07-07
python構(gòu)建自定義回調(diào)函數(shù)詳解
在工作中,回調(diào)函數(shù)使用的場景是非常多的,下面我們就來通過例子程序來詳細(xì)了解利用了Python的屬性機(jī)制構(gòu)建了一個(gè)自定義回調(diào)函數(shù)的使用2017-06-06
Django多個(gè)app urls配置代碼實(shí)例
這篇文章主要介紹了Django多個(gè)app urls配置代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11

