SpringBoot實(shí)現(xiàn)EMQ設(shè)備的上下線告警
前言
我遇到了一個(gè)難題,即在使用 EMQ X 4.4.10
的開(kāi)源版本時(shí),我需要實(shí)現(xiàn)設(shè)備的上下線狀態(tài)監(jiān)控,但該 4.4.10開(kāi)源版本
并未內(nèi)置設(shè)備上下線提醒模塊,只有企業(yè)版才內(nèi)置了該模塊。這為我?guī)?lái)了一些技術(shù)上的難題,迫使我必須另辟蹊徑,通過(guò)其他方法來(lái)監(jiān)聽(tīng)設(shè)備的上下線狀態(tài)。為了解決這個(gè)問(wèn)題,我采取了以下步驟:
首先,我修改了 EMQ X
的 acl.config
文件,添加了以下規(guī)則:
{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
圖示:
這個(gè)規(guī)則允許訂閱 $SYS/brokers/+/clients/#
主題的所有客戶端。
接下來(lái),我使用 Spring Boot
創(chuàng)建了一個(gè)應(yīng)用程序,其中我設(shè)置了與 EMQ X
代理的連接。在這個(gè)應(yīng)用程序中,我創(chuàng)建了一個(gè)監(jiān)聽(tīng)器,用于訂閱 $SYS/brokers/+/clients/#
主題,以感知設(shè)備的上下線信息。
通過(guò)這個(gè)解決方案,我能夠?qū)崟r(shí)監(jiān)控設(shè)備的連接和斷開(kāi)事件,而不受 EMQ X
開(kāi)源版本的功能限制。這使我能夠根據(jù)設(shè)備狀態(tài)采取適當(dāng)?shù)拇胧热绨l(fā)送通知或執(zhí)行其他自定義操作。這個(gè)方法允許我靈活地定制解決方案,以滿足我的特定需求,而無(wú)需依賴 EMQ X
的功能。
EMQ簡(jiǎn)介
EMQ
( Erlang MQTT Broker
)是一種基于 Erlang
編程語(yǔ)言開(kāi)發(fā)的開(kāi)源消息傳遞代理( MQTT broker
),專門(mén)用于支持 MQTT
( Message Queuing Telemetry Transport
)協(xié)議。MQTT是一種輕量級(jí)、高效的消息傳遞協(xié)議,最初設(shè)計(jì)用于連接受限的設(shè)備,如嵌入式系統(tǒng)和物聯(lián)網(wǎng)設(shè)備。 EMQ
作為 MQTT broker
,提供了可靠的消息傳遞機(jī)制,使設(shè)備能夠相互通信,同時(shí)也可與后端應(yīng)用程序集成,使其成為物聯(lián)網(wǎng)生態(tài)系統(tǒng)中的重要組成部分。
環(huán)境
EMQX
安裝方式:Docker
EMQX
版本:4.4.10開(kāi)源版本
- 操作系統(tǒng):
CentOS 7
下載
準(zhǔn)備工作
安裝EMQX
修改EMQX的ACL規(guī)則內(nèi)容
EMQX的Docker容器配置文件所在目錄
# 進(jìn)入EMQX容器 docker exec -it emqx /bin/sh # 進(jìn)入配置文件目錄 cd /opt/emqx/etc # 查看acl配置文件 cat acl.conf # 編輯acl配置文件 vi acl.conf
非Docker容器配置文件所在目錄
# 進(jìn)入配置文件目錄 cd /etc/emqx # 查看acl配置文件 cat acl.conf # 編輯acl配置文件 vi acl.conf
acl的默認(rèn)文件內(nèi)容
%%-------------------------------------------------------------------- %% [ACL](https://docs.emqx.io/broker/v3/en/config.html) %% %% -type(who() :: all | binary() | %% {ipaddr, esockd_access:cidr()} | %% {ipaddrs, [esockd_access:cidr()]} | %% {client, binary()} | %% {user, binary()}). %% %% -type(access() :: subscribe | publish | pubsub). %% %% -type(topic() :: binary()). %% %% -type(rule() :: {allow, all} | %% {allow, who(), access(), list(topic())} | %% {deny, all} | %% {deny, who(), access(), list(topic())}). %%-------------------------------------------------------------------- {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}. {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. {allow, all}.
新增一條ACL規(guī)則
allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
allow
: 表示這是一個(gè)允許(allow
)訪問(wèn)的規(guī)則。這意味著與此規(guī)則匹配的操作將被允許。all
: 表示這個(gè)規(guī)則適用于所有的客戶端。subscribe
: 表示這個(gè)規(guī)則定義了對(duì)主題的訂閱權(quán)限。$SYS/brokers/+/clients/#
: 這是一個(gè)主題過(guò)濾器,它指定了匹配的主題模式。在這里,$SYS/brokers/+/clients/#
表示以$SYS/brokers/
開(kāi)頭,然后是一個(gè)通配符 +,它可以匹配單個(gè)層級(jí)的任何名稱,接著是clients/
,最后又有一個(gè) # 通配符,它可以匹配零個(gè)或多個(gè)層級(jí)的名稱。因此,這個(gè)主題過(guò)濾器匹配了所有以$SYS/brokers/
開(kāi)頭,然后緊跟著clients/
的主題。
綜合起來(lái),這個(gè)規(guī)則允許所有的客戶端訂閱以
$SYS/brokers/
開(kāi)頭,然后跟著clients/
的所有主題。通常,這種規(guī)則用于允許所有客戶端訂閱系統(tǒng)級(jí)別的信息或監(jiān)控?cái)?shù)據(jù),如經(jīng)紀(jì)人(Broker
)的客戶端連接狀態(tài)等。這可以用于監(jiān)視和診斷MQTT Broker
的性能和狀態(tài)。
注意:修改完畢之后使用 emqx_ctl reload_acl
重新加載acl規(guī)則或者直接重啟emqx服務(wù)
搭建一個(gè)能夠監(jiān)聽(tīng)EMQX主題的Spring Boot應(yīng)用程序
MQTT相關(guān)依賴
<!-- MQTT相關(guān)依賴 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
MQTT接受訂閱的主題
$SYS/brokers/+/clients/#
處理設(shè)備上下線事件
獲取 EMQX
消息主題
// 從消息請(qǐng)求頭中獲取消息主題topic String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
獲取topic最后的節(jié)點(diǎn)字符串
以下方式通過(guò)主題 topic
來(lái)獲取 ClientId
// topic最后的節(jié)點(diǎn)字符串 String lastPart = extractLastPart(topic); // 獲取消息內(nèi)容 String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8"); // 判斷設(shè)備是上線或下線消息 if ("connected".equals(lastPart)) { // 設(shè)備上線消息 clientId = extractClientIdFromTopic(topic); log.info("設(shè)備上線提醒 -> IMEI:{}", clientId); } else if ("disconnected".equals(lastPart)) { // 設(shè)備下線消息 clientId = extractClientIdFromTopic(topic); log.info("設(shè)備下線警告 -> IMEI:{}", clientId); } /** * 獲取最后一個(gè)節(jié)點(diǎn) * * @param topic 主題 * @return 節(jié)點(diǎn)內(nèi)容 */ public static String extractLastPart(String topic) { // 使用split方法將字符串根據(jù)'/'分割成數(shù)組 String[] parts = topic.split("/"); // 獲取最后一個(gè)元素 String lastPart = parts[parts.length - 1]; return lastPart; } /** * 從Topic中提取ClientId * * @param topic 主題 * @return ClientId */ public static String extractClientIdFromTopic(String topic) { // 使用正則表達(dá)式匹配主題中的ClientId String regex = "\\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected)"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(topic); // matcher.groupCount() 是一個(gè)方法,用于獲取正則表達(dá)式中定義的組數(shù)。在正則表達(dá)式中,使用括號(hào) () 來(lái)定義捕獲組。在這個(gè)情況下,正則表達(dá)式 \\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected) 中有兩組,分別是括號(hào)內(nèi)的 ([^/]+) 部分和 (connected|disconnected) 部分。matcher.groupCount() 返回的是正則表達(dá)式中捕獲組的數(shù)量 if (matcher.matches() && matcher.groupCount() == 2) { // 如果正則匹配成功,提取ClientId并返回 return matcher.group(1); } else { // 如果匹配失敗,返回null或者拋出異常,視情況而定 return null; }
當(dāng)然你也可以通過(guò)解析 payload
來(lái)獲取更多詳細(xì)信息,可參照官方文檔:客戶端上下線事件
主題 (Topic) | 說(shuō)明 |
---|---|
${clientid}/connected | 上線事件。當(dāng)任意客戶端上線時(shí),EMQX 就會(huì)發(fā)布該主題的消息 |
${clientid}/disconnected | 下線事件。當(dāng)任意客戶端下線時(shí),EMQX 就會(huì)發(fā)布該主題的消息 |
connected
事件消息的 Payload 解析成 JSON 格式如下:
{ "username": "foo", "ts": 1625572213873, "sockport": 1883, "proto_ver": 4, "proto_name": "MQTT", "keepalive": 60, "ipaddress": "127.0.0.1", "expiry_interval": 0, "connected_at": 1625572213873, "connack": 0, "clientid": "emqtt-8348fe27a87976ad4db3", "clean_start": true }
disconnected
事件消息的 Payload 解析成 JSON 格式如下:
{ "username": "foo", "ts": 1625572213873, "sockport": 1883, "reason": "tcp_closed", "proto_ver": 4, "proto_name": "MQTT", "ipaddress": "127.0.0.1", "disconnected_at": 1625572213873, "clientid": "emqtt-8348fe27a87976ad4db3" }
可以解析 JOSN
數(shù)據(jù)拿到 clientid
,注意此處使用的 JSON
解析工具是 Hutool
的。
// 獲取消息內(nèi)容 String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8"); // 解析JSON字符串 JSONObject payloadJsonObject = JSONUtil.parseObj(payload); // 獲取ClientId String clientid = payloadJsonObject.getStr("clientid");
實(shí)現(xiàn)效果
總結(jié)
- 修改 EMQ X ACL 配置: 你在 EMQ X 中修改了 acl.config 文件,添加了相應(yīng)的 ACL 規(guī)則,允許訂閱 $SYS/brokers/+/clients/# 主題的所有客戶端。這個(gè)步驟允許你在開(kāi)源版本中訪問(wèn)關(guān)鍵的設(shè)備連接信息。
- 創(chuàng)建 Spring Boot 應(yīng)用程序: 通過(guò)創(chuàng)建一個(gè) Spring Boot 應(yīng)用程序,你建立了一個(gè)連接到 EMQ X 代理的橋梁。這個(gè)應(yīng)用程序充當(dāng)了監(jiān)聽(tīng)器,用于訂閱 $SYS/brokers/+/clients/# 主題,以實(shí)時(shí)感知設(shè)備的連接和斷開(kāi)事件。
- 實(shí)時(shí)設(shè)備監(jiān)控: 你的解決方案允許你實(shí)時(shí)監(jiān)控設(shè)備的連接狀態(tài),而無(wú)需依賴 EMQ X 企業(yè)版的內(nèi)置功能。這使你能夠快速響應(yīng)設(shè)備狀態(tài)的變化,并采取相應(yīng)的行動(dòng),如發(fā)送通知或執(zhí)行自定義操作。
- 定制性: 通過(guò)這個(gè)方法,你能夠靈活地定制解決方案,以滿足你的特定需求。你可以根據(jù)設(shè)備狀態(tài)采取不同的操作,因此具有更大的靈活性。
- 避免功能限制: 盡管 EMQ X 4.4.10 開(kāi)源版本沒(méi)有內(nèi)置設(shè)備上下線提醒模塊,但你成功地繞過(guò)了這個(gè)限制,通過(guò)自定義配置和應(yīng)用程序開(kāi)發(fā)來(lái)實(shí)現(xiàn)了所需的功能。
- 無(wú)需升級(jí)或許可證: 你的解決方案不需要升級(jí)到 EMQ X 企業(yè)版或購(gòu)買額外的許可證。這使得你可以在開(kāi)源版本的基礎(chǔ)上構(gòu)建所需的功能。
- 總而言之,你的解決方案是一種巧妙的方式,通過(guò)修改配置和創(chuàng)建一個(gè)自定義的 Spring Boot 應(yīng)用程序,實(shí)現(xiàn)了設(shè)備上下線狀態(tài)的監(jiān)控和管理。這個(gè)方法不僅解決了技術(shù)挑戰(zhàn),還提供了靈活性和定制性,以滿足你的特定需求。這是一個(gè)創(chuàng)造性的方法,適用于需要在 EMQ X 開(kāi)源版本中實(shí)現(xiàn)設(shè)備監(jiān)控的情況。
以上就是SpringBoot實(shí)現(xiàn)EMQ設(shè)備的上下線告警的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot EMQ上下線告警的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 中組合模型之對(duì)象結(jié)構(gòu)模式的詳解
這篇文章主要介紹了Java 中組合模型之對(duì)象結(jié)構(gòu)模式的詳解的相關(guān)資料,希望通過(guò)本文能幫助到大家理解應(yīng)用對(duì)象結(jié)構(gòu)模型,需要的朋友可以參考下2017-09-09ByteArrayInputStream簡(jiǎn)介和使用_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
ByteArrayInputStream 是字節(jié)數(shù)組輸入流。它繼承于InputStream。這篇文章主要介紹了ByteArrayInputStream簡(jiǎn)介和使用_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理,需要的朋友可以參考下2017-05-05解決Mybatis-Plus更新方法不更新NULL字段的問(wèn)題
這篇文章主要介紹了解決Mybatis-Plus更新方法不更新NULL字段的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12java后端請(qǐng)求兌現(xiàn)request的中文亂碼問(wèn)題解決
文章主要講述了在處理處理方案工作中遇到中文亂碼問(wèn)題的解決過(guò)程,通過(guò)復(fù)現(xiàn)和分析亂碼問(wèn)題,發(fā)現(xiàn)是由于解碼規(guī)則和后端服務(wù)編碼不一致導(dǎo)致的,最終通過(guò)修改過(guò)濾器中的編碼設(shè)置解決了問(wèn)題2025-02-02SpringBoot自動(dòng)配置Quartz的實(shí)現(xiàn)步驟
本文主要介紹了SpringBoot自動(dòng)配置Quartz的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11SpringBoot實(shí)現(xiàn)讀取YML,yaml,properties文件
yml,yaml,properties三種文件都是用來(lái)存放配置的文件,一些靜態(tài)數(shù)據(jù),配置的數(shù)據(jù)都會(huì)存放到里邊。本文主要為大家整理了SpringBoot實(shí)現(xiàn)讀取YML,yaml,properties文件的方法,需要的可以參考一下2023-04-04springboot+gradle 構(gòu)建多模塊項(xiàng)目的步驟
這篇文章主要介紹了springboot+gradle 構(gòu)建多模塊項(xiàng)目的步驟,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-05-05