springboot整合mqtt實現(xiàn)消息訂閱和推送功能
前言
mica-mqtt-client-spring-boot-starter是一個基于Spring Boot的MQTT客戶端啟動器,它集成了mica-mqtt客戶端,提供了在Spring Boot應(yīng)用程序中使用MQTT協(xié)議進(jìn)行消息通信的能力。以下是關(guān)于mica-mqtt-client-spring-boot-starter的簡介:
特點:
- 簡單易用:通過Spring Boot的自動配置,可以輕松地集成到Spring應(yīng)用程序中,并使用Spring的注解或Java配置進(jìn)行MQTT客戶端的配置。
- 低延遲:支持MQTT協(xié)議,能夠?qū)崿F(xiàn)實時消息通信,具有較低的延遲。
- 高性能:基于mica-mqtt客戶端,具有高效的消息處理和網(wǎng)絡(luò)通信能力,能夠處理大量的并發(fā)連接和消息。
- 集群支持:支持基于Redis的發(fā)布/訂閱模式的集群,可以實現(xiàn)多個節(jié)點之間的消息同步和負(fù)載均衡。
- 使用場景:適用于需要使用MQTT協(xié)議進(jìn)行消息通信的物聯(lián)網(wǎng)、實時應(yīng)用、移動應(yīng)用等領(lǐng)域??梢栽谠贫嘶蜻吘壎耸褂茫瑢崿F(xiàn)設(shè)備與設(shè)備之間、設(shè)備與服務(wù)器之間的消息通信。
- 集成方式:通過在Spring Boot項目中添加相關(guān)依賴,并配置MQTT客戶端的相關(guān)參數(shù),即可快速集成mica-mqtt-client-spring-boot-starter。具體的使用方法可以參考官方文檔和示例代碼。
- 注意事項:在使用過程中需要注意確保網(wǎng)絡(luò)連接的穩(wěn)定性和安全性,并根據(jù)實際需求進(jìn)行適當(dāng)?shù)呐渲煤蛢?yōu)化。同時,也需要關(guān)注數(shù)據(jù)安全和隱私保護(hù)等方面的問題。
總之,mica-mqtt-client-spring-boot-starter是一個方便、高效、可靠的MQTT客戶端啟動器,適用于需要使用MQTT協(xié)議進(jìn)行消息通信的Spring Boot應(yīng)用程序。
功能
- 支持 MQTT v3.1、v3.1.1 以及 v5.0 協(xié)議。
- 支持 websocket mqtt 子協(xié)議(支持 mqtt.js)。
- 支持 http rest api,http api 文檔詳見[1]。
- 支持 MQTT client 客戶端。
- 支持 MQTT server 服務(wù)端。
- 支持 MQTT client、server 共享訂閱支持(捐助VIP版采用 topic 樹存儲,跟 topic 數(shù)無關(guān),百萬 topic 性能依舊)。
- 支持 MQTT 遺囑消息。
- 支持 MQTT 保留消息。
- 支持自定義消息(mq)處理轉(zhuǎn)發(fā)實現(xiàn)集群。
- MQTT 客戶端 阿里云 mqtt 連接 demo。
- 支持 GraalVM 編譯成本機(jī)可執(zhí)行程序。
- 支持 Spring boot 項目快速接入(mica-mqtt-spring-boot-starter)。
- mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。
- 基于 redis pub/sub 實現(xiàn)集群,詳見 mica-mqtt-broker 模塊[2]
教程
添加依賴
在springboot項目中添加maven依賴:
<!-- https://mvnrepository.com/artifact/net.dreamlu/mica-mqtt-client-spring-boot-starter --> <dependency> <groupId>net.dreamlu</groupId> <artifactId>mica-mqtt-client-spring-boot-starter</artifactId> <version>2.2.8</version> </dependency>
配置參數(shù)
在spring配置文件中配置mqtt相關(guān)參數(shù),配置如下:
mqtt: server: enabled: false # 是否開啟服務(wù)端,默認(rèn):false client: enabled: true # 是否開啟客戶端,默認(rèn):false ip: 172.16.10.203 # 連接的服務(wù)端 ip ,默認(rèn):127.0.0.1 port: 1883 # 端口:默認(rèn):1883 name: Mica2-Mqtt2-Client # 名稱,默認(rèn):Mica-Mqtt-Client clientId: coalface_safety_3d # 客戶端Id(非常重要,一般為設(shè)備 sn,不可重復(fù)) user-name: admin # 認(rèn)證的用戶名 你的用戶名 password: 3@!cHy@j # 認(rèn)證的密碼 timeout: 5 # 連接超時時間,單位:秒,默認(rèn):5秒 reconnect: true # 是否重連,默認(rèn):true re-interval: 5000 # 重連時間,默認(rèn) 5000 毫秒 version: MQTT_3_1 # mqtt 協(xié)議版本,默認(rèn):3.1.1 read-buffer-size: 8092 # 接收數(shù)據(jù)的 buffer size,默認(rèn):8092 max-bytes-in-message: 8092 # 消息解析最大 bytes 長度,默認(rèn):8092 buffer-allocator: heap # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存 keep-alive-secs: 60 # keep-alive 心跳維持時間,單位:秒 clean-session: false # mqtt clean session,默認(rèn):true will-message: # 消息遺囑 qos: at_least_once ssl: enabled: false # 是否開啟 ssl 認(rèn)證,2.1.0 開始支持雙向認(rèn)證 keystore-path: # 可選參數(shù):ssl 雙向認(rèn)證 keystore 目錄,支持 classpath:/ 路徑。 keystore-pass: # 可選參數(shù):ssl 雙向認(rèn)證 keystore 密碼 truststore-path: # 可選參數(shù):ssl 雙向認(rèn)證 truststore 目錄,支持 classpath:/ 路徑。 truststore-pass: # 可選參數(shù):ssl 雙向認(rèn)證 truststore 密碼
注意:ssl 存在三種情況
服務(wù)端開啟ssl | 客戶端 |
---|---|
ClientAuth 為 NONE(不需要客戶端驗證) | 僅僅需要開啟 ssl 即可不用配置證書 |
ClientAuth 為 OPTIONAL(與客戶端協(xié)商) | 需開啟 ssl 并且配置 truststore 證書 |
ClientAuth 為 REQUIRE (必須的客戶端驗證) | 需開啟 ssl 并且配置 truststore、 keystore證書 |
創(chuàng)建訂閱
創(chuàng)建一個mqtt訂閱消息監(jiān)聽類,例如SimulationSubscriber,代碼如下:
import com.alibaba.fastjson.JSONObject; import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.tio.utils.buffer.ByteBufferUtil; /** * @author tarzan */ @Component @Slf4j public class SimulationSubscriber { @MqttClientSubscribe("tuoyuan/publish/zj/#") public void zjOne(String topic, byte[] payload){ String[] strs=topic.split("/"); String ID=strs[strs.length-1]; log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID); } @MqttClientSubscribe("/sys/${deviceName}/thing/sub/register") public void thingSubRegister(String topic, byte[] payload) { // 1.3.8 開始支持,@MqttClientSubscribe 注解支持 ${} 變量替換,會默認(rèn)替換成 + // 注意:mica-mqtt 會先從 Spring boot 配置中替換參數(shù) ${},如果存在配置會優(yōu)先被替換。 logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); } @MqttClientSubscribe("/tianma/publish/cmj") public void cmj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //業(yè)務(wù)的處理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/zj") public void zj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //業(yè)務(wù)的處理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/gbj") public void gbj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //業(yè)務(wù)的處理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ltl") public void ltl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //業(yè)務(wù)的處理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ntl") public void ntl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //業(yè)務(wù)的處理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ccl") public void ccl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //業(yè)務(wù)的處理 System.out.println("*****************test**************************************"+jsonObject); } }
- @Header(“topic”) 和@Payload 注解可以省略
- tuoyuan/publish/zj/# 中的# 是通配符
- 在MQTT協(xié)議中,#是一個通配符,代表匹配該主題的所有子主題。例如,如果你訂閱了主題sports/baseball/#,那么你將接收到所有以sports/baseball/開頭的主題的消息。
- 請注意,通配符#只能用于多層的主題名稱中,并且只能用于最后一個級別。例如,sports/baseball/#是有效的,但#sports/baseball或sports/#/baseball都是無效的。
- 除了#之外,MQTT協(xié)議還支持一個單層通配符+,它代表只匹配該級別的主題。例如,如果你訂閱了主題sports/baseball/+,那么你將只接收到以sports/baseball/開頭,且后面跟著至少一個字符的主題的消息。
- 請注意,使用通配符時需要謹(jǐn)慎,因為它們可能會匹配到意外的主題。確保你的訂閱主題明確,并且只匹配你感興趣的主題。
- /sys/${deviceName}/thing/sub/register
- 1.3.8 開始支持,@MqttClientSubscribe 注解支持 ${} 變量替換,會默認(rèn)替換成 +
- 注意:mica-mqtt 會先從 Spring boot 配置中替換參數(shù) ${},如果存在配置會優(yōu)先被替換。
創(chuàng)建發(fā)布
創(chuàng)建一個mqtt消息發(fā)布接口類,例如 MqttTestController,代碼如下:
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate; import org.springblade.core.secure.annotation.NoToken; import org.springblade.core.tool.api.R; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.nio.charset.StandardCharsets; /** * @author tarzan */ @RestController @Api(tags = "mqtt測試") @NoToken @RequestMapping("/mqtt") @AllArgsConstructor @Slf4j public class MqttTestController { private final MqttClientTemplate mqttClientTemplate; @ApiOperation(value = "消息發(fā)送") @PostMapping("/publish") private R<Boolean> publish(String topic, String msg) { return R.status(mqttClientTemplate.publish(topic, msg.getBytes(StandardCharsets.UTF_8))); } }
接口測試
接口調(diào)用
控制臺輸出
到此這篇關(guān)于springboot整合mqtt實現(xiàn)消息訂閱和推送的文章就介紹到這了,更多相關(guān)springboot整合mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot lua檢查redis庫存的實現(xiàn)示例
本文主要介紹了springboot lua檢查redis庫存的實現(xiàn)示例,為了優(yōu)化性能,通過Lua腳本實現(xiàn)對多個馬戲場次下的座位等席的庫存余量檢查,感興趣的可以了解一下2024-09-09java.lang.IllegalArgumentException:Invalid character&nb
本文介紹了java.lang.IllegalArgumentException: Invalid character found異常的解決,方法包括檢查代碼中的方法名,使用合適的HTTP請求方法常量,使用第三方HTTP庫,檢查請求URL以及使用調(diào)試和日志工具,通過這些方法,我們可以解決異常并確保網(wǎng)絡(luò)應(yīng)用程序的正常運行2023-10-10Springboot通過配置WebMvcConfig處理Cors非同源訪問跨域問題
這篇文章主要介紹了Springboot通過配置WebMvcConfig處理Cors非同源訪問跨域問題,關(guān)于Cors跨域的問題,前端有代理和jsonp的常用方式解決這種非同源的訪問拒絕策略2023-04-04SpringBoot集成iText實現(xiàn)電子簽章功能
這篇文章主要為大家詳細(xì)介紹了SpringBoot如何集成iText實現(xiàn)電子簽章功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-10-10