SpringBoot 集成MQTT實現(xiàn)消息訂閱的詳細(xì)代碼
1、引入依賴
<!--MQTT start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.4</version>
</dependency>
<!--MQTT end-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>2、增加yml配置
spring:
mqtt:
username: test
password: test
url: tcp://127.0.0.1:8080
subClientId: singo_sub_client_id_888 #訂閱 客戶端id
pubClientId: singo_pub_client_id_888 #發(fā)布 客戶端id
connectionTimeout: 30
keepAlive: 603、資源配置類
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {
private String username;
private String password;
private String url;
private String subClientId;
private String pubClientId;
private int connectionTimeout;
private int keepAlive;
}注意啟動類需要增加注解
@EnableConfigurationProperties(MqttConfigurationProperties.class)
4、MQTT配置類
@Configuration
public class MqttConfig {
@Autowired
private MqttConfigurationProperties mqttConfigurationProperties;
/**
* 連接參數(shù)
*
* @return
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfigurationProperties.getUsername());
options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());
options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());
options.setCleanSession(true); // 設(shè)置為false以便斷線重連后恢復(fù)會話
options.setAutomaticReconnect(true);
return options;
}
/**
* 連接工廠
*
* @param options
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
}
/**
* 消息輸入通道
* 每次只有一個消息處理器可以消費消息。
* 當(dāng)前消息的處理完成之前,新消息需要排隊等待,無法并行處理。
* 默認(rèn)是:單線程、順序執(zhí)行的
* @return
*/
// @Bean
// public DirectChannel mqttInputChannel() {
// return new DirectChannel();
// }
/**
* 支持多線程并發(fā)處理消息的輸入通道
*
* @return
*/
@Bean
public ExecutorChannel mqttInputChannel() {
return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 線程池大小可以調(diào)整
}
/**
* 配置入站適配器
*
* @param mqttClientFactory
* @return
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);
// adapter.addTopic("pub/300119110099"); 訂閱主題,也可以放在初始化動態(tài)配置
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 配置消息處理器
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道
public MessageHandler messageHandler() {
return new MqttReceiverMessageHandler();
}
}5、消息處理器配置
@Slf4j
@Component
public class MqttReceiverMessageHandler implements MessageHandler {
@Autowired
private MqttMessageProcessingService mqttMessageProcessingService;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageHeaders headers = message.getHeaders();
log.info("線程名稱:{},收到消息,主題:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());
// log.info("收到消息主題:{}", headers.get("mqtt_receivedTopic").toString());
// log.info("收到消息:{}", message.getPayload());
// 消息保存到內(nèi)存隊列里面,定時批量入庫,也可以在這里直接入庫
mqttMessageProcessingService.addMessage(message.getPayload().toString());
}
}6、消息主題緩存對象
@Component
public class MqttTopicStore {
private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>();
public ConcurrentHashMap<String, String> getTopics() {
return topics;
}
}7、動態(tài)訂閱數(shù)據(jù)庫主題配置
@Slf4j
@Component
public class MqttInit {
@Autowired
private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;
@Autowired
private MqttTopicStore mqttTopicStore;
@PostConstruct
public void init() {
subscribeAllTopics();
}
public void subscribeAllTopics() {
// List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled();
// for (MqttTopicConfig topic : topics) {
// subscribeTopic(topic);
// }
log.info("===================>從數(shù)據(jù)庫里獲取并初始化訂閱所有主題");
List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100");
topics.stream().forEach(t -> {
messageDrivenChannelAdapter.addTopic(t);
// 同時往MqttTopicStore.topics中增加一條記錄用于緩存
});
}
}8、消息處理服務(wù)
@Service
public class MqttMessageProcessingService {
@Autowired
private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;
@Autowired
private MqttTopicStore mqttTopicStore;
// 內(nèi)存隊列,用于暫存消息
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
// 添加消息到隊列
public void addMessage(String message) {
messageQueue.add(message);
}
/**
* 可以放到定時任務(wù)里面去,注入后取隊列方便維護(hù)
* 定時任務(wù),每5秒執(zhí)行一次 ,建議2分鐘一次 理想的觸發(fā)間隔應(yīng)略小于數(shù)據(jù)到達(dá)間隔,以確保及時處理和插入
* 如果每 5 分鐘收到一條數(shù)據(jù),可以設(shè)置任務(wù)執(zhí)行周期為4 分鐘或更短,以便任務(wù)有足夠的時間處理數(shù)據(jù),同時減少積壓的可能性。
*/
@Scheduled(fixedRate = 1 * 60 * 1000)
public void batchInsertToDatabase() {
System.out.println("定時任務(wù)執(zhí)行中,當(dāng)前隊列大?。? + messageQueue.size());
List<String> batch = new ArrayList<>();
messageQueue.drainTo(batch, 500); // 一次性取最多500條消息
if (!batch.isEmpty()) {
// 批量插入數(shù)據(jù)庫
saveMessagesToDatabase(batch);
}
}
private void saveMessagesToDatabase(List<String> messages) {
// 假設(shè)這是批量插入邏輯
System.out.println("批量插入數(shù)據(jù)庫,條數(shù):" + messages.size());
for (String message : messages) {
System.out.println("插入消息:" + message);
}
// 實際數(shù)據(jù)庫操作代碼
}
/**
* 訂閱與取消訂閱定時任務(wù)
*/
public void subscribeAndUnsubscribeTask() {
// 從數(shù)據(jù)庫獲取所有主題,正常狀態(tài)、刪除狀態(tài)
// 正常狀態(tài):判斷mqttTopicStore.topics中是否存在,不存在則訂閱,并在mqttTopicStore.topics中增加
// 刪除狀態(tài): 判斷mqttTopicStore.topics中是否存在,存在則取消訂閱,并在mqttTopicStore.topics中刪除
// messageDrivenChannelAdapter.addTopic(t);
}
}以上是簡單的對接步驟,部分類、方法可以根據(jù)實際情況進(jìn)行合并處理!?。。?/p>
9、定時任務(wù)
@Slf4j
@Configuration
@EnableScheduling
public class MqttJob {
@Value("${schedule.enable}")
private boolean enable;
@Autowired
private MqttMessageProcessingService mqttMessageProcessingService;
/**
* 定時訂閱與取消訂閱主題,從共享主題對象MqttTopicStore里面取出主題列表,然后進(jìn)行訂閱或取消訂閱
* 每分鐘一次
*/
public void subscribeAndUnsubscribe() {
if (!enable) return;
mqttMessageProcessingService.subscribeAndUnsubscribeTask();
}
/**
* 定時處理隊列里面的訂閱消息,會有丟失風(fēng)險,宕機時會丟失隊列里面的消息
* 每分鐘一次 要考慮一次消息處理的時間;也可先不使用隊列,每次收到消息直接實時入庫,有性能問題時在啟用
*/
public void batchSaveSubscribeMessage() {
}
}到此這篇關(guān)于SpringBoot 集成MQTT實現(xiàn)消息訂閱的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java中申請不定長度數(shù)組ArrayList的方法
今天小編就為大家分享一篇java中申請不定長度數(shù)組ArrayList的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07
java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作
這篇文章主要介紹了java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Java多線程Thread基礎(chǔ)學(xué)習(xí)
每一個正在執(zhí)行的程序都是一個進(jìn)程,資源只有一塊,所以在同一時間段會有多個程序同時執(zhí)行,但是在一個時間點上,只能由一個程序執(zhí)行,多線程是在一個進(jìn)程的基礎(chǔ)之上的進(jìn)一步劃分,需要的朋友可以參考下2023-04-04
redisson.tryLock()參數(shù)的使用及理解
這篇文章主要介紹了redisson.tryLock()參數(shù)的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04
多線程計數(shù),怎么保持計數(shù)準(zhǔn)確的方法
這篇文章主要介紹了多線程計數(shù)的方法,有需要的朋友可以參考一下2014-01-01

