亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信

 更新時(shí)間:2024年08月18日 16:11:59   作者:@步行者@  
MQTT非常適用于物聯(lián)網(wǎng)領(lǐng)域,本文主要介紹了SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

引言

本文是springboot集成mqtt的一個(gè)實(shí)戰(zhàn)案例。
gitee代碼庫(kù)地址:源碼地址

一、什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于 TCP/IP 協(xié)議上,由 IBM 于 1999 年發(fā)明。MQTT 協(xié)議的主要特征是開(kāi)放、簡(jiǎn)單、輕量級(jí)和易于實(shí)現(xiàn),這些特征使得它適用于受約束的應(yīng)用環(huán)境,如:

網(wǎng)絡(luò)受限:網(wǎng)絡(luò)帶寬較低且傳輸不可靠
終端受限:協(xié)議運(yùn)行在嵌入式設(shè)備上,嵌入式終端的處理器、內(nèi)存等是受限的

MQTT 非常適用于物聯(lián)網(wǎng)領(lǐng)域,如傳感器與服務(wù)器的通信、傳感器信息采集等。

二、發(fā)布/訂閱模式

發(fā)布/訂閱模式(Publish/Subscribe Pattern,簡(jiǎn)稱Pub/Sub)是一種消息通信模式,在這種模式下,消息的發(fā)送者(發(fā)布者)不會(huì)將消息直接發(fā)送給特定的接收者(訂閱者)。而是將代表消息內(nèi)容的通知(事件)發(fā)布到一個(gè)特定的主題或頻道上,而訂閱了這個(gè)主題的接收者會(huì)收到所有在這個(gè)主題上發(fā)布的通知。這種模式解耦了消息的發(fā)送者和接收者,使得系統(tǒng)更加靈活和可擴(kuò)展。

主要組成部分

  • 發(fā)布者(Publisher):負(fù)責(zé)生成消息并將其發(fā)布到特定的主題或頻道。

  • 訂閱者(Subscriber):注冊(cè)對(duì)特定主題的興趣,并接收該主題上的所有消息。

  • 消息代理(Message Broker):作為中間件,它接收來(lái)自發(fā)布者的消息,并將這些消息傳遞給所有相關(guān)的訂閱者。

優(yōu)點(diǎn)

  • 解耦:發(fā)布者和訂閱者之間不需要直接交互,這降低了系統(tǒng)的耦合度。

  • 靈活性:可以動(dòng)態(tài)添加或刪除訂閱者,不影響其他組件。

  • 可擴(kuò)展性:系統(tǒng)容易擴(kuò)展,可以輕松增加新的發(fā)布者或訂閱者。

缺點(diǎn)

  • 復(fù)雜性:引入了額外的組件(如消息代理),增加了系統(tǒng)的復(fù)雜性和管理成本。

  • 性能開(kāi)銷:消息的傳遞需要通過(guò)中間件,可能會(huì)有延遲和性能損失。

應(yīng)用場(chǎng)景

  • 事件驅(qū)動(dòng)架構(gòu):在微服務(wù)架構(gòu)中,不同的服務(wù)通過(guò)發(fā)布/訂閱模式進(jìn)行異步通信。

  • 數(shù)據(jù)流處理:如實(shí)時(shí)數(shù)據(jù)分析,多個(gè)組件可以訂閱數(shù)據(jù)流并進(jìn)行處理。

  • 分布式系統(tǒng):用于跨系統(tǒng)或跨服務(wù)的消息傳遞。

發(fā)布/訂閱模式并不是 MQTT 協(xié)議特有的模式,很多消息中間件都有使用發(fā)布/訂閱模式,有同學(xué)可能認(rèn)為這就是觀察者模式,還真不是,這兩個(gè)模式很容易混淆。觀察者模式只有觀察者 + 被觀察者兩個(gè)角色,而發(fā)布/訂閱模式還有一個(gè)經(jīng)紀(jì)人 Broker;往更深層次的講觀察者和被觀察者,是松耦合的關(guān)系,而發(fā)布者和訂閱者,則完全不存在耦合。

三、Windows下安裝MQTT消息服務(wù)器

非常遺憾,EMQ X Broker 在 5.4.0 版本的發(fā)行版中已不支持 windows 版本的安裝包了,筆者從網(wǎng)上找了一個(gè)最后支持版本的壓縮包,已上傳資源。

  • 解壓后,在bin文件下,使用cmd執(zhí)行運(yùn)行命令 .\emqx console
  • 訪問(wèn)MQTT管理頁(yè)面 http://localhost:18083/#/ 用戶名密碼 admin/public

如果報(bào)錯(cuò)缺少Erlang環(huán)境,需要自行安裝下該環(huán)境

在這里插入圖片描述

在這里插入圖片描述

瀏覽器訪問(wèn):http://localhost:18083/#,輸入賬號(hào)密碼進(jìn)入,會(huì)要求你修改密碼,可以暫時(shí)跳過(guò)

在這里插入圖片描述

四、Windows安裝MQTT消息代理客戶端MQTTX

下載地址:MQTTX下載地址

點(diǎn)擊免費(fèi)下載

在這里插入圖片描述

選擇64位版本

在這里插入圖片描述

下好后點(diǎn)擊安裝,啟動(dòng)運(yùn)行界面如下:

在這里插入圖片描述

語(yǔ)言是英文,可以在設(shè)置按鈕里調(diào)成中文。這個(gè)客戶端代理主要是進(jìn)行消息發(fā)送的測(cè)試服務(wù)。

五、新建MQTT集成項(xiàng)目

隨便新建了一個(gè)springboot應(yīng)用,用的是JDK17,在pom文件中引入如下依賴:

        <!-- MQTT -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

5.1 yml配置

server:
  port: 8081

#允許循環(huán)依賴
spring:
  main:
    allow-circular-references: true

customer:
  mqtt:
    broker: tcp://localhost:1883
    clientList:
      #發(fā)布客戶端ID
      - clientId: nays_service
        #監(jiān)聽(tīng)主題 同時(shí)訂閱多個(gè)主題 使用 - 分割開(kāi)
        subscribeTopic: mqtt/publish
        #用戶名
        userName: admin
        #密碼
        password: public
      #接收客戶端ID
      - clientId: receive_service
        #監(jiān)聽(tīng)主題 同時(shí)訂閱多個(gè)主題 使用 - 分割開(kāi)
        subscribeTopic: mqtt/receive
        #用戶名
        userName: admin
        #密碼
        password: public



5.2 Mqtt配置類

package com.hulei.mqttproject.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * Mqtt配置類
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要?jiǎng)?chuàng)建的MQTT客戶端
     */
    List<MqttClient> clientList;
}

5.3 MQTT客戶端

package com.hulei.mqttproject.config;

import lombok.Data;


/**
 * MQTT客戶端
 */
@Data
public class MqttClient {
    /**
     * 客戶端ID
     */
    private String clientId;
    /**
     * 監(jiān)聽(tīng)主題
     */
    private String subscribeTopic;
    /**
     * 用戶名
     */
    private String userName;
    /**
     * 密碼
     */
    private String password;
}

5.4 MQTT客戶端管理類

package com.hulei.mqttproject.config;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT客戶端管理類,如果客戶端非常多后續(xù)可入redis緩存
 */
@Slf4j
@Component
public class MqttClientManager {

    @Value("${customer.mqtt.broker}")
    private String mqttBroker;

    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存儲(chǔ)MQTT客戶端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 創(chuàng)建mqtt客戶端
     *
     * @param clientId       客戶端ID
     * @param subscribeTopic 訂閱主題,可為空
     * @param userName       用戶名,可為空
     * @param password       密碼,可為空
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !userName.isEmpty()) {
                connOpts.setUserName(userName);
            }

            if (null != password && !password.isEmpty()) {
                connOpts.setPassword(password.toCharArray());
            }

            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }

                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }

            //連接mqtt服務(wù)端broker
            client.connect(connOpts);
            // 訂閱主題
            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                if (subscribeTopic.contains("-"))
                    client.subscribe(subscribeTopic.split("-"));
                else {
                    client.subscribe(subscribeTopic);
                }
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}

5.5 MQTT客戶端創(chuàng)建

package com.hulei.mqttproject.config;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * MQTT客戶端創(chuàng)建
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager mqttClientManager;
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 創(chuàng)建MQTT客戶端
     */
    @PostConstruct
    public void createMqttClient() {
        List<MqttClient> mqttClientList = mqttConfig.getClientList();

        for (MqttClient mqttClient : mqttClientList) {
            log.info("{}", mqttClient);
            //創(chuàng)建客戶端,客戶端ID:demo,回調(diào)類跟客戶端ID一致
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}

5.6 MQTT回調(diào)抽象類

package com.hulei.mqttproject.config;

import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * MQTT回調(diào)抽象類
 */
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {

    private String clientId;

    private MqttConnectOptions connectOptions;

    @Resource
    MqttClientManager mqttClientManager;

    /**
     * 失去連接操作,進(jìn)行重連
     *
     * @param throwable 異常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != connectOptions) {
                    mqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                } else {
                    mqttClientManager.getMqttClientById(clientId).connect();
                }
            }

        } catch (Exception e) {
            log.error("{} reconnect failed!", e.getMessage(), e);
        }
    }

    /**
     * 接收訂閱消息
     * @param topic    主題
     * @param mqttMessage 接收消息
     * @throws Exception 異常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload());
     	handleReceiveMessage(topic, content);
    }

    /**
     * 消息發(fā)送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息發(fā)送成功");
    }


    /**
     * 處理接收的消息
     * @param topic   主題
     * @param message 消息內(nèi)容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}


5.7 MQTT訂閱回調(diào)環(huán)境類

package com.hulei.mqttproject.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT訂閱回調(diào)環(huán)境類
 */
@Component
@Slf4j
public class MqttCallBackContext {

    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();

    /**
     * 默認(rèn)構(gòu)造函數(shù)
     *
     * @param callBackMap 回調(diào)集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.putAll(callBackMap);
    }

    /**
     * 獲取MQTT回調(diào)類
     *
     * @param clientId 客戶端ID
     * @return MQTT回調(diào)類
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}

5.8 默認(rèn)回調(diào)類

package com.hulei.mqttproject.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 默認(rèn)回調(diào)
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    /**
     * @param topic   主題
     * @param message 消息內(nèi)容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("接收到主題---{}", topic);
        log.info("接收到消息---{}", message);
        // 自定義消息處理業(yè)務(wù)

    }
}

六、測(cè)試服務(wù)類

package com.hulei.mqttproject.controller;

import com.hulei.mqttproject.config.MqttClientManager;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class SendController {

    @Resource
    private MqttClientManager mqttClientManager;

    @RequestMapping("/sendMessage")
    public String sendMessage(String topic){
        try {
            MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
            mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);
            return "發(fā)送成功";
        } catch (Exception e) {
            log.error("發(fā)送失敗",e);
            return "發(fā)送失敗";
        }
    }
}

七、啟動(dòng)springboot

啟動(dòng)日志可以看到,mqtt消息服務(wù)器連接成功

在這里插入圖片描述


EMQX工具顯示發(fā)布客戶端和接收客戶端均已成功注冊(cè)

在這里插入圖片描述

使用Apifox測(cè)試下SendController中的接口,mqtt/receive是yaml中接收客戶端訂閱的主題,當(dāng)然也可以往mqtt/publish主題發(fā),mqtt中消息的發(fā)布者也可以訂閱主題,監(jiān)聽(tīng)某些消息。

在這里插入圖片描述

在這里插入圖片描述

到此這篇關(guān)于SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信的文章就介紹到這了,更多相關(guān)SpringBoot MQTT交互服務(wù)通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • 透過(guò)Spring源碼查看Bean的命名轉(zhuǎn)換規(guī)則圖文詳解

    透過(guò)Spring源碼查看Bean的命名轉(zhuǎn)換規(guī)則圖文詳解

    Java Bean是一種 Java 編程語(yǔ)言編寫的可重用軟件組件,包括符合一定規(guī)范的Java 類、屬性和方法,用于描述和處理應(yīng)用程序中的數(shù)據(jù)對(duì)象,下面這篇文章主要給大家介紹了關(guān)于透過(guò)Spring源碼查看Bean的命名轉(zhuǎn)換規(guī)則的相關(guān)資料,需要的朋友可以參考下
    2023-06-06
  • Java數(shù)據(jù)結(jié)構(gòu)二叉樹(shù)難點(diǎn)解析

    Java數(shù)據(jù)結(jié)構(gòu)二叉樹(shù)難點(diǎn)解析

    樹(shù)是一種重要的非線性數(shù)據(jù)結(jié)構(gòu),直觀地看,它是數(shù)據(jù)元素(在樹(shù)中稱為結(jié)點(diǎn))按分支關(guān)系組織起來(lái)的結(jié)構(gòu),很象自然界中的樹(shù)那樣。樹(shù)結(jié)構(gòu)在客觀世界中廣泛存在,如人類社會(huì)的族譜和各種社會(huì)組織機(jī)構(gòu)都可用樹(shù)形象表示
    2021-10-10
  • SpringBoot 單元測(cè)試JUnit的使用詳解

    SpringBoot 單元測(cè)試JUnit的使用詳解

    這篇文章主要介紹了SpringBoot 單元測(cè)試JUnit的使用詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • OpenFeign設(shè)置header的三種方式總結(jié)

    OpenFeign設(shè)置header的三種方式總結(jié)

    在微服務(wù)間使用Feign進(jìn)行遠(yuǎn)程調(diào)用時(shí)需要在header中添加信息,下面這篇文章主要給大家介紹了關(guān)于OpenFeign設(shè)置header的三種方式,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-04-04
  • Netty中最簡(jiǎn)單的粘包解析方法分享

    Netty中最簡(jiǎn)單的粘包解析方法分享

    黏包 是指網(wǎng)絡(luò)上有多條數(shù)據(jù)發(fā)送給服務(wù)端, 但是由于某種原因這些數(shù)據(jù)在被接受的時(shí)候進(jìn)行了重新組合,本文分享了一種最簡(jiǎn)單的黏包解析方法, 非常適用于初初初級(jí)選手
    2023-05-05
  • Spring?Cloud?Eureka:?指定Zone方式

    Spring?Cloud?Eureka:?指定Zone方式

    這篇文章主要介紹了Spring?Cloud?Eureka:?指定Zone方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • SpringBoot yml配置文件調(diào)用過(guò)程解析

    SpringBoot yml配置文件調(diào)用過(guò)程解析

    這篇文章主要介紹了SpringBoot yml配置文件調(diào)用過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Java復(fù)合語(yǔ)句的使用方法詳解

    Java復(fù)合語(yǔ)句的使用方法詳解

    這篇文章主要介紹了Java編程中復(fù)合語(yǔ)句,結(jié)合相關(guān)的具體實(shí)例介紹了其用法,需要的朋友可以參考下
    2017-09-09
  • java僅用30行代碼就實(shí)現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換

    java僅用30行代碼就實(shí)現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換

    這篇文章主要介紹了java僅用30行代碼就實(shí)現(xiàn)了視頻轉(zhuǎn)音頻的批量轉(zhuǎn)換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Java實(shí)現(xiàn)跳躍表(skiplist)的簡(jiǎn)單實(shí)例

    Java實(shí)現(xiàn)跳躍表(skiplist)的簡(jiǎn)單實(shí)例

    這篇文章主要介紹了Java編程中跳躍表的概念和實(shí)現(xiàn)原理,并簡(jiǎn)要敘述了它的結(jié)構(gòu),具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-09-09

最新評(píng)論