python使用MQTT給硬件傳輸圖片的實(shí)現(xiàn)方法
最近因需要用python寫一個(gè)微服務(wù)來用MQTT給硬件傳輸圖片,其中python用的是flask框架,大概流程如下:
協(xié)議為:
需要將圖片數(shù)據(jù)封裝成多個(gè)消息進(jìn)行傳輸,每個(gè)消息傳輸?shù)臄?shù)據(jù)字節(jié)數(shù)為1400Byte。
消息(MQTT Payload) 格式:Web服務(wù)器-------->BASE:
反饋:BASE---------> Web服務(wù)器:
如果Web服務(wù)器發(fā)送完一個(gè)“數(shù)據(jù)傳輸消息”后,5S內(nèi)沒有收到MQTT“反饋消息”或者收到的反饋中顯示“數(shù)據(jù)包不完整”,則重發(fā)該“數(shù)據(jù)傳輸消息”。
程序流程圖
根據(jù)上面的協(xié)議,可以得到如下的流程圖:
代碼如下:
# encoding:utf-8 from flask import Flask, jsonify from flask_restful import Api, Resource, reqparse from PIL import Image from io import BytesIO import requests import os, logging, time import paho.mqtt.client as mqtt import struct from flask_cors import * # 日志配置信息 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s', ) class Mqtt(object): def __init__(self, img_data, size): self.MQTTHOST = '*******' self.MQTTPORT = "******" # 訂閱和發(fā)送的主題 self.topic_from_base = 'mqttTestSub' self.topic_to_base = 'mqttTestPub' self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) self.client = mqtt.Client(self.client_id) # 完成鏈接后的回掉函數(shù) self.client.on_connect = self.on_connect # 圖片大小 self.size = size # 用于跳出死循環(huán),結(jié)束任務(wù) self.finished = None # 包的編號(hào) self.index = 0 # 將收到的圖片數(shù)據(jù)按大小分成列表 self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)] # 記錄發(fā)布后的數(shù)據(jù),用于監(jiān)控時(shí)延 self.pub_time = 0 self.header_to_base = 0xffffeeee self.header_from_base = 0xeeeeffff # 功能標(biāo)識(shí) self.function_begin = 0x01 self.function_doing = 0x02 self.function_finished = 0x03 # 包的完整和非完整狀態(tài) self.whole_package = 0x01 self.bad_package = 0x00 # 頭信息的格式,小端模式 self.format_to_base = "<Lbhh" self.format_from_base = "<Lbhb" # 如果重發(fā)包時(shí),用于檢查是否重發(fā)第一個(gè)包 self.first = True # 如果重發(fā)包時(shí),用于檢查是否重發(fā)最后一個(gè)包 self.last = False self.begin_data = 'image.jpg;' + str(self.size) # 鏈接mqtt服務(wù)器函數(shù) def on_mqtt_connect(self): self.client.connect(self.MQTTHOST, self.MQTTPORT, 60) self.client.loop_start() # 鏈接完成后的回調(diào)函數(shù) def on_connect(self, client, userdata, flags, rc): logging.info("+++ Connected with result code {} +++".format(str(rc))) self.client.subscribe(self.topic_from_base) # 訂閱函數(shù) def subscribe(self): self.client.subscribe(self.topic_from_base, 1) # 消息到來處理函數(shù) self.client.on_message = self.on_message # 接收到信息后的回調(diào)函數(shù) def on_message(self, client, userdata, msg): # 如果接受第一個(gè)包則不需要重發(fā)第一個(gè) self.first = False # 將接受到的包進(jìn)行解壓,得到一個(gè)元組 base_tuple = struct.unpack(self.format_from_base, msg.payload) logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple)) logging.info("+++ package_number is {}, package_status_from_base is {} +++" .format(base_tuple[2], base_tuple[3])) # 檢查接受到信息的頭部是否正確 if base_tuple[0] == self.header_from_base: logging.info("+++ function_from_base is {} +++".format(base_tuple[1])) # 是否完成傳輸,如果完成則退出 if base_tuple[1] == self.function_finished: logging.info("+++ finish work +++") self.finished = 1 self.client.disconnect() else: # 是否是最后一個(gè)包 if self.index == len(self.image_data_list) - 1: self.publish('finished', self.function_finished) self.last = True logging.info("+++ finished_data_to_base is finished+++") else: # 如果接收到的包不是 0x03則進(jìn)行傳送數(shù)據(jù) if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing: logging.info("+++ package_number is {}, package_status_from_base is {} +++" .format(base_tuple[2],base_tuple[3])) # 如果數(shù)據(jù)的反饋中,包的狀態(tài)是1則繼續(xù)發(fā)下一個(gè)包 if base_tuple[3] == self.whole_package: self.publish(self.index, self.function_doing) logging.info("+++ data_to_base is finished+++") self.index += 1 # 如果數(shù)據(jù)的反饋中,包的狀態(tài)是0則重發(fā)數(shù)據(jù)包 elif base_tuple[3] == self.bad_package: re_package_number = base_tuple[2] self.publish(re_package_number-1, self.function_doing) logging.info("+++ re_data_to_base is finished+++") else: logging.info("+++ package_status_from_base is not 0 or 1 +++") self.client.disconnect() else: logging.info("+++ function_identifier is illegal +++") self.client.disconnect() else: logging.info("+++ header_from_base is illegal +++") self.client.disconnect() # 數(shù)據(jù)發(fā)送函數(shù) def publish(self, index, fuc): # 看是否是最后一個(gè)包 if index == 'finished': length = 0 package_number = 0 data = b'' else: length = len(self.image_data_list[index]) package_number = index data = self.image_data_list[index] # 打包數(shù)據(jù)頭信息 buffer = struct.pack( self.format_to_base, self.header_to_base, fuc, package_number, length ) to_base_data = buffer + data # mqtt發(fā)送 self.client.publish( self.topic_to_base, to_base_data ) self.pub_time = time.time() # 發(fā)送第一個(gè)包函數(shù) def publish_begin(self): buffer = struct.pack( self.format_to_base, self.header_to_base, self.function_begin, 0, len(self.begin_data.encode('utf-8')), ) begin_data = buffer + self.begin_data.encode('utf-8') self.client.publish(self.topic_to_base, begin_data) # 控制函數(shù) def control(self): self.on_mqtt_connect() self.publish_begin() begin_time = time.time() self.pub_time = time.time() self.subscribe() while True: time.sleep(1) # 超過5秒重傳 date = time.time() - self.pub_time if date > 5: # 是否重傳第一個(gè)包 if self.first == True: self.publish_begin() logging.info('+++ this is timeout first_data +++') # 是否重傳最后一個(gè)包 elif self.last == True: self.publish('finished', self.function_finished) logging.info('+++ this is timeout last_data +++') else: self.publish(self.index-1, self.function_doing) logging.info('+++ this is timeout middle_data +++') if self.finished == 1: logging.info('+++ all works is finished+++') break print(str(time.time()-begin_time) + 'begin_time - end_time') app = Flask(__name__) api = Api(app) CORS(app, supports_credentials=True) # 接受參數(shù) parser = reqparse.RequestParser() parser.add_argument('url', help='mqttImage url', location='args', type=str) class GetImage(Resource): # 得到參數(shù)并從圖床下載到本地 def get(self): args = parser.parse_args() url = args.get('url') response = requests.get(url) # 獲取圖片 image = Image.open(BytesIO(response.content)) # 存取圖片 add = os.path.join(os.path.abspath(''), 'image.jpg') image.save(add) # 得到圖片大小 size = os.path.getsize(add) f = open(add, 'rb') imageData = f.read() f.close() # 進(jìn)行mqtt傳輸 mqtt = Mqtt(imageData, size) mqtt.control() # 刪除文件 os.remove(add) logging.info('*** the result of control is {} ***'.format(1)) return jsonify({ "imageData": 1 }) api.add_resource(GetImage, '/image') if __name__ == '__main__': app.run(debug=True, host='0.0.0.0')
總結(jié)
以上所述是小編給大家介紹的python使用MQTT給硬件傳輸圖片的實(shí)現(xiàn)方法,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
如果你覺得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!
相關(guān)文章
使用paramiko遠(yuǎn)程執(zhí)行命令、下發(fā)文件的實(shí)例
下面小編就為大家?guī)硪黄褂胮aramiko遠(yuǎn)程執(zhí)行命令、下發(fā)文件的實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-10-10python接口測試對(duì)修改密碼接口進(jìn)行壓測
這篇文章主要為大家介紹了python接口測試對(duì)修改密碼接口進(jìn)行壓測的腳本實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07python的getattr和getattribute攔截內(nèi)置操作實(shí)現(xiàn)
在Python中,getattr和getattribute是用于動(dòng)態(tài)屬性訪問和自定義屬性訪問行為的重要工具,本文主要介紹了python的getattr和getattribute攔截內(nèi)置操作實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01解決Pytorch在測試與訓(xùn)練過程中的驗(yàn)證結(jié)果不一致問題
這篇文章主要介紹了解決Pytorch在測試與訓(xùn)練過程中的驗(yàn)證結(jié)果不一致問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Python遞歸實(shí)現(xiàn)猴子吃桃問題及解析
這篇文章主要介紹了Python遞歸實(shí)現(xiàn)猴子吃桃問題及解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07Pandas DataFrame如何按照一列數(shù)據(jù)的特定順序進(jìn)行排序
這篇文章主要介紹了Pandas DataFrame如何按照一列數(shù)據(jù)的特定順序進(jìn)行排序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-10-10python爬蟲反爬之圖片驗(yàn)證功能實(shí)現(xiàn)
這篇文章主要介紹了python爬蟲反爬之圖片驗(yàn)證功能實(shí)現(xiàn),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-03-03Pandas的AB BA類型數(shù)據(jù)框去重復(fù)
這篇文章主要為大家介紹了Pandas的AB BA類型數(shù)據(jù)框去重復(fù)實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05Python將json文件寫入ES數(shù)據(jù)庫的方法
這篇文章主要介紹了Python將json文件寫入ES數(shù)據(jù)庫的方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值 ,需要的朋友可以參考下2019-04-04