SpringBoot整合MQTT并實現異步線程調用的問題
為什么選擇MQTT
MQTT的定義相信很多人都能講的頭頭是道,本文章也不討論什么高大上的東西,旨在用最簡單直觀的方式讓每一位剛接觸的同行們可以最快的應用起來
先從使用MQTT需要什么開始分析:
- 消息服務器
- 不同應用/設備之間的頻繁交互
- 可能涉及一對多的消息傳遞
基于SpringBoot通過注解實現對mqtt消息處理的異步調用
使用背景
生產環(huán)境下, 由于mqtt 生產者生產的消息逐漸增多, 可能會導致消息堆積. 因此需要消費者去快速的消費.
而其中的一個方案便是使用異步線程去加速消費消息. 下面介紹下思路
我們可以在原來的mqtt工具類上面進行改裝.
首先創(chuàng)建一個類MqttMessageListener并繼承IMqttMessageListener實現messageArrived, 用于處理這些消息(業(yè)務編寫)
然后改寫mqtt客戶端訂閱的方法, 注入MqttMessageListener, 并在訂閱方法中新增該參數
在然后在啟動類開啟異步線程, 編寫一個配置類配置線程池參數并且在messageArrived加上@Async開啟異步線程調用
代碼實現
基礎代碼
指沒有開啟線程池的代碼
MqttPushClient 主要定義了連接參數
import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * @Author * @Date * @Description 連接至EMQ X 服務器,獲取mqtt連接,發(fā)布消息 */ @Component public class MqttPushClient{ private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); @Autowired private PushCallback pushCallback; private static MqttClient client; public static void setClient(MqttClient client) { MqttPushClient.client = client; } public static MqttClient getClient() { return client; } public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) { MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); if (username != null) { options.setUserName(username); } if (password != null) { options.setPassword(password.toCharArray()); } options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); MqttPushClient.setClient(client); try { //設置回調類 client.setCallback(pushCallback); //client.connect(options); IMqttToken iMqttToken = client.connectWithResult(options); boolean complete = iMqttToken.isComplete(); log.info("MQTT連接"+(complete?"成功":"失敗")); /** 訂閱主題 **/ for (String topic : topicList) { log.info("連接訂閱主題:{}", topic); client.subscribe(topic, 0); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } }
PushCallback 回調類, 實現重連, 消息發(fā)送監(jiān)聽, 消息接收監(jiān)聽
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Author * @Date * @Description 消息回調,處理接收的消息 */ @Component public class PushCallback implements MqttCallback { private static final Logger log = LoggerFactory.getLogger(PushCallback.class); @Autowired private MqttConfiguration mqttConfiguration; @Autowired private MqttTopic mqttTopic; @Override public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 log.info("連接斷開,正在重連"); MqttPushClient mqttPushClient = mqttConfiguration.getMqttPushClient(); if (null != mqttPushClient) { mqttPushClient.connect(mqttConfiguration.getHost(), mqttConfiguration.getClientid(), mqttConfiguration.getUsername(), mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive(), mqttConfiguration.getTopic()); log.info("已重連"); } } /** * 發(fā)送消息,消息到達后處理方法 * @param token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { int messageId = token.getMessageId(); String[] topics = token.getTopics(); log.info("消息發(fā)送完成,messageId={},topics={}",messageId,topics.toString()); } /** * 訂閱主題接收到消息處理方法 * @param topic * @param message */ @Override public void messageArrived(String topic, MqttMessage message) { // subscribe后得到的消息會執(zhí)行到這里面,這里在控制臺有輸出 String messageStr = new String(message.getPayload()); // messageDistribute.distribute(topic, messageStr); log.info("接收的主題:" + topic + ";接收到的信息:" + messageStr); } }
MqttConfiguration 配置了mqtt相關參數, 并初始化連接(mqtt在這里啟動)
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.List; /** * @Author * @Date mqtt配置及連接 * @Description */ @Slf4j @Component @Configuration @ConfigurationProperties(MqttConfiguration.PREFIX) public class MqttConfiguration { @Autowired private MqttPushClient mqttPushClient; /** * 指定配置文件application-local.properties中的屬性名前綴 */ public static final String PREFIX = "std.mqtt"; private String host; private String clientId; private String userName; private String password; private int timeout; private int keepAlive; private List<String> topic; public String getClientid() { return clientId; } public void setClientid(String clientid) { this.clientId = clientid; } public String getUsername() { return userName; } public void setUsername(String username) { this.userName = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getTimeout() { return timeout; } public void setTimeout(int timeout) { this.timeout = timeout; } public int getKeepalive() { return keepAlive; } public void setKeepalive(int keepalive) { this.keepAlive = keepalive; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public List<String> getTopic() { return topic; } public void setTopic(List<String> topic) { this.topic = topic; } /** * 連接至mqtt服務器,獲取mqtt連接 * @return */ @Bean public MqttPushClient getMqttPushClient() { //連接至mqtt服務器,獲取mqtt連接 mqttPushClient.connect(host, clientId, userName, password, timeout, keepAlive, topic); return mqttPushClient; } } properties.yml 配置文件 std.mqtt: host: tcp://x.x.x.x:1883 username: your_username password: your_password #MQTT-連接服務器默認客戶端ID clientid: your_clientid #連接超時 timeout: 1000 # deviceId deviceId: your_deviceId # mqtt-topic topic[0]: your_tpoic
TopicOperation 定義了發(fā)布訂閱的方法
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; /** * @Author chy */ public class TopicOperation { private static final Logger log = LoggerFactory.getLogger(TopicOperation.class); /** * 訂閱主題 * @param topic 主題名稱 */ public static void subscribe(String topic) { try { MqttClient client = MqttPushClient.getClient(); if (client == null) { return; }; client.subscribe(topic, 0); log.info("訂閱主題:{}",topic); } catch (MqttException e) { e.printStackTrace(); } } /** * 發(fā)布主題 * * @param topic * @param pushMessage */ public static void publish(String topic, String pushMessage) { log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage); MqttMessage message = new MqttMessage(); message.setQos(0); // 非持久化 message.setRetained(false); message.setPayload(pushMessage.getBytes()); MqttClient client = MqttPushClient.getClient(); if (client == null) { return; }; MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.error("主題不存在:{}",mTopic); } try { mTopic.publish(message); } catch (Exception e) { log.error("mqtt發(fā)送消息異常:",e); } } }
定義了發(fā)布和訂閱的相關主題
import com.sxd.onlinereservation.exception.BusinessException; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @Author * @Date topic名稱 * @Description */ @Component public class MqttTopic { @Value("${std.mqtt.deviceId}") private String[] deviceId; public String getSubscribeTopic(String type){ switch (type){ case "appointTopic": return String.format("/v1/%s/service/appointTopic", deviceId[0]); default: throw new BusinessException("mqtt 訂閱主題獲取錯誤"); } } public String getPublishTopic(String type) { switch (type){ //1.0接口立即取號發(fā)布主題 case "appointTopic": return String.format("/v1/%s/service/appointTopic", deviceId[1]); default: throw new BusinessException("mqtt 發(fā)布主題獲取錯誤"); } } }
ps: 如果想要使用該工具類進行消息發(fā)送和接收看下面demo
//消息發(fā)布操作 TopicOperation.publish(mqttTopic.getPublishTopic("appointTopic"), "消息體")); //消息訂閱操作 TopicOperation.subscribe(mqttTopic.getSubscribeTopic("appointTopic"), "消息體"));
異步線程處理實現
總結
- 創(chuàng)建消息監(jiān)聽類 , 用于監(jiān)聽消息并進行業(yè)務處理
- 在原來訂閱時, 注入并使用第一步創(chuàng)建的監(jiān)聽類
- 通過注解開啟異步線程并配置處理方式
創(chuàng)建消息監(jiān)聽類 , 用于監(jiān)聽消息并進行業(yè)務處理
@Slf4j @Component public class MqttMessageListener implements IMqttMessageListener { @Resource private BusinessService businessService; @Autowired private MqttTopic mqttTopic; @Autowired private ThreeCallmachineService threeCallmachineService; @Autowired private BusinessHallService businessHallService; @Autowired private BusinessMaterialService businessMaterialService; @Autowired private BusinessWaitService businessWaitService; @Autowired private AppointmentService appointmentService; @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String messageStr = new String(message.getPayload()); log.info("接收的主題:" + topic + ";接收到的信息:" + messageStr); //進行 業(yè)務處理 } }
在原來訂閱時, 注入并使用第一步創(chuàng)建的監(jiān)聽類
注入了
MqttMessageListener
, 并且在訂閱時加入client.subscribe(topic, mqttMessageListener);
修改MqttPushClient (必須)
@Component public class MqttPushClient{ private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); @Autowired private PushCallback pushCallback; @Autowired //這里進行了注入操作 private MqttMessageListener mqttMessageListener; private static MqttClient client; public static void setClient(MqttClient client) { MqttPushClient.client = client; } public static MqttClient getClient() { return client; } public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) { MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); if (username != null) { options.setUserName(username); } if (password != null) { options.setPassword(password.toCharArray()); } options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); MqttPushClient.setClient(client); try { //設置回調類 client.setCallback(pushCallback); //client.connect(options); IMqttToken iMqttToken = client.connectWithResult(options); boolean complete = iMqttToken.isComplete(); log.info("MQTT連接"+(complete?"成功":"失敗")); /** 訂閱主題 **/ for (String topic : topicList) { log.info("連接訂閱主題:{}", topic); //client.subscribe(topic, 0); client.subscribe(topic, mqttMessageListener); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } }
如果業(yè)務還使用了手動訂閱, 則也需要在訂閱的類上面注入MqttMessageListener , 并且在訂閱方法中作為參數使用. 但是我們需要將方法改成非靜態(tài)的, 因此在使用該方法時我們需要new該對象然后才能夠調用. 但是手動訂閱很少用到. 因此有無此步驟都可
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; /** * @Author chy * @Date * @Description */ public class TopicOperation { private static final Logger log = LoggerFactory.getLogger(TopicOperation.class); //注入MqttMessageListener @Autowired private MqttMessageListener mqttMessageListener; /** * 訂閱主題 * @param topic 主題名稱 */ public void subscribe(String topic) { try { MqttClient client = MqttPushClient.getClient(); if (client == null) { return; }; //client.subscribe(topic, 0); //在訂閱方法中作為參數使用 client.subscribe(topic, mqttMessageListener); log.info("訂閱主題:{}",topic); } catch (MqttException e) { e.printStackTrace(); } } /** * 發(fā)布主題 * * @param topic * @param pushMessage */ public static void publish(String topic, String pushMessage) { log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage); MqttMessage message = new MqttMessage(); message.setQos(0); // 非持久化 message.setRetained(false); message.setPayload(pushMessage.getBytes()); MqttClient client = MqttPushClient.getClient(); if (client == null) { return; }; MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.error("主題不存在:{}",mTopic); } try { mTopic.publish(message); } catch (Exception e) { log.error("mqtt發(fā)送消息異常:",e); } } }
通過注解開啟異步線程并配置處理方式 啟動類開啟 @EnableAsync(proxyTargetClass=true )
@SpringBootApplication @MapperScan(basePackages = "com.x.x.mapper") @EnableTransactionManagement @EnableAsync(proxyTargetClass=true ) public class XXApplication { public static void main(String[] args) { SpringApplication.run(XXApplication.class, args); } }
配置類配置線程池參數
@Slf4j @Configuration public class ExecutorConfig { @Bean public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize(9); //配置最大線程數 executor.setMaxPoolSize(20); //配置隊列大小 executor.setQueueCapacity(200); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("sxd-async-service-"); // 設置拒絕策略:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }
MqttMessageListener的實現方法messageArrived開啟@Async("asyncServiceExecutor")
@Slf4j @Component public class MqttMessageListener implements IMqttMessageListener { @Resource private BusinessService businessService; @Autowired private MqttTopic mqttTopic; @Autowired private ThreeCallmachineService threeCallmachineService; @Autowired private BusinessHallService businessHallService; @Autowired private BusinessMaterialService businessMaterialService; @Autowired private BusinessWaitService businessWaitService; @Autowired private AppointmentService appointmentService; @Override @Async("asyncServiceExecutor") public void messageArrived(String topic, MqttMessage message) throws Exception { String messageStr = new String(message.getPayload()); log.info("接收的主題:" + topic + ";接收到的信息:" + messageStr); System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】"); //進行 業(yè)務處理 } }
到此這篇關于SpringBoot整合MQTT并實現異步線程調用的文章就介紹到這了,更多相關SpringBoot異步線程調用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
詳解基于java的Socket聊天程序——客戶端(附demo)
這篇文章主要介紹了詳解基于java的Socket聊天程序——客戶端(附demo),客戶端設計主要分成兩個部分,分別是socket通訊模塊設計和UI相關設計。有興趣的可以了解一下。2016-12-12SpringBoot應用啟動失?。憾丝谡加脤е耇omcat啟動失敗的問題分析與解決方法
在開發(fā)和運維過程中,應用程序啟動失敗是我們經常遇到的一個問題,尤其是在 Web 應用程序中,涉及到 Web 服務器的配置時,今天我們將探討一個常見的啟動錯誤,尤其是在使用 Spring Boot 和內嵌 Tomcat 服務器時,需要的朋友可以參考下2024-11-11