SpringBoot整合MOTT動態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端
MQTT介紹:
概述:
MQTT是一種輕量級的消息傳輸協(xié)議(Message Queuing Telemetry Transport),旨在實現(xiàn)設(shè)備之間的低帶寬和高延遲的通信。它是基于發(fā)布/訂閱模式(Publish/Subscribe)的消息協(xié)議,最初由IBM開發(fā),現(xiàn)在成為了一種開放的標(biāo)準(zhǔn),被廣泛應(yīng)用于物聯(lián)網(wǎng)(IoT)領(lǐng)域。
MQTT特點包括:
1、輕量級:MQTT協(xié)議設(shè)計簡單,消息頭部輕量,適用于受限環(huán)境的設(shè)備,如傳感器、嵌入式設(shè)備等。
2、簡單易用:MQTT采用發(fā)布/訂閱模式,消息的發(fā)送者(發(fā)布者)和接收者(訂閱者)之間解耦,通信過程簡單易理解。
3、低帶寬、高延遲:MQTT協(xié)議設(shè)計考慮了網(wǎng)絡(luò)帶寬受限和延遲較高的情況,能夠在不理想的網(wǎng)絡(luò)環(huán)境下保持穩(wěn)定的消息傳輸。
4、可靠性:MQTT支持消息的持久化和確認(rèn)機(jī)制,確保消息的可靠傳輸,同時提供了QoS(Quality of Service)等級,可以根據(jù)實際需求進(jìn)行靈活配置。
5、靈活性:MQTT支持多種消息格式和負(fù)載類型,可以傳輸文本、二進(jìn)制數(shù)據(jù)等多種類型的消息,同時支持SSL/TLS加密,保障通信安全。
6、適用于多種場景:由于其輕量級和靈活性,MQTT被廣泛應(yīng)用于物聯(lián)網(wǎng)、傳感器網(wǎng)絡(luò)、遠(yuǎn)程監(jiān)控、消息通知等場景,成為連接設(shè)備的重要通信協(xié)議之一。
話不多說,直接看代碼如何連接
因項目需求,本次做的是在項目啟動時,動態(tài)讀取數(shù)據(jù)庫中已經(jīng)配置好的mqtt連接信息,并且根據(jù)這些信息動態(tài)的循環(huán)連接服務(wù)端,在接收到消息后進(jìn)行持久化和相關(guān)邏輯處理。
一、首先加載依賴
<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
二、因為是要在項目啟動時候連接,但是又要等項目初始化后拿到要用的mapper,所以在這個類中需要實現(xiàn)ApplicationRunner接口,而沒有用其他的方法,有多種實現(xiàn)但是我用的這個
package com.ruoyi; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ruoyi.system.domain.mqtt.MqttBean; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * @author 1097022316 * 啟動后建立MQTT連接 并對數(shù)據(jù)持久化和相關(guān)邏輯處理 */ @Component public class StartInit implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); //這里是從數(shù)據(jù)庫中查詢出所有的mqtt連接相關(guān)信息,如ip、topic等 List<DeviceCollector> collectorList = collectorService.list(new QueryWrapper<DeviceCollector>().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic")); System.err.println("collectorList = " + collectorList); //如果沒有任何連接 直接結(jié)束 if (CollUtil.isEmpty(collectorList)) { return; } List<MqttBean> mqttBeans = new ArrayList<>(); //把從數(shù)據(jù)庫中查詢出來的信息組裝成mqttclient連接所需要的對象 //一般都是ip、port、username、password、topic、clientid 這里是簡單的用法,如有高級用法可自行摸索(順便在下面評論教一下) collectorList.forEach(mqtt -> { MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId()); mqttBeans.add(bean); }); //對我們組裝的mqtt連接對象信息進(jìn)行遍歷循環(huán)連接 mqttBeans.forEach(bean -> { try { MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); //設(shè)置相關(guān)連接參數(shù),有些是必要有些是非必要 可自行點進(jìn)去查看源碼 connOpts.setAutomaticReconnect(false); connOpts.setCleanSession(true); connOpts.setUserName(bean.getUserName()); connOpts.setPassword(bean.getPassword().toCharArray()); mqttClient.connect(connOpts); //把連接對象加入到全局 mqttClients.put(bean.getTopics(), mqttClient); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { System.out.println("Reconnected successfully. url = " + serverURI); } else { System.out.println("Connected successfully for the first time."); } } /** * 設(shè)置重連機(jī)制 */ @Override public void connectionLost(Throwable cause) { System.err.println(bean.getTopics() + "連接丟失" + cause.getMessage()); if (!mqttClient.isConnected()) { try { Thread.sleep(1000 * 60 * 5); //嘗試連接 System.out.println("bean = " + bean); boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId())); if (flag) { System.err.println(bean.getTopics() + "重新連接,重新訂閱!"); } } catch (InterruptedException e) { MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了" + e.getMessage(), "CLCW"); throw new RuntimeException(e); } } } @Override public void messageArrived(String topic, MqttMessage message) { //這里是當(dāng)消息推送時我們做的事情 System.out.println("dosomethings..."); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("消息發(fā)送完整"); } }); mqttClient.subscribe(bean.getTopics(), 2); } catch (Exception e) { e.printStackTrace(); System.err.println(bean.getTopics() + "MQTT連接出異常了"); MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了"+e.getMessage(),"LJCW"); try { Thread.sleep(1000 * 60 * 30); //嘗試連接 System.out.println("bean = " + bean); boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId())); if (flag) { System.err.println(bean.getTopics() + "重新連接,重新訂閱!"); } } catch (InterruptedException ex) { //可以在這里把報錯信息存入本地看看 throw new RuntimeException(ex); } } }); } }
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author 1097022316 */ @Data @AllArgsConstructor @NoArgsConstructor public class MqttBean { private String url; private String userName; private String password; private String topics; private String clientId; }
重點是MqttClient中幾個參數(shù)和配置參數(shù),以及那幾個重寫的方法,看下源碼就好了。這里用的比較粗糙,只是簡單的實現(xiàn)了連接和重連,一些復(fù)雜的如心跳或者遺囑啥的都沒用,要研究可自行查看
以上就是SpringBoot整合MOTT動態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot MOTT讀取數(shù)據(jù)庫的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot整合hibernate validator 全局異常處理步驟詳解
本文分步驟給大家介紹Springboot整合hibernate validator 全局異常處理,補(bǔ)呢文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01Spring創(chuàng)建bean對象三種方式代碼實例
這篇文章主要介紹了Spring創(chuàng)建bean對象三種方式代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法
這篇文章主要介紹了Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法,本文通過實例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12使用Jackson來實現(xiàn)Java對象與JSON的相互轉(zhuǎn)換的教程
這篇文章主要介紹了使用Jackson來實現(xiàn)Java對象與JSON的互相轉(zhuǎn)換的教程,文中羅列了3中Jackson的使用方式,需要的朋友可以參考下2016-01-01Java中線程組ThreadGroup與線程池的區(qū)別及示例
這篇文章主要介紹了Java中線程組與線程池的區(qū)別及示例,ThreadGroup是用來管理一組線程的,可以控制線程的執(zhí)行,查看線程的執(zhí)行狀態(tài)等操作,方便對于一組線程的統(tǒng)一管理,需要的朋友可以參考下2023-05-05