springboot整合mqtt的詳細圖文教程
springboot 整合 mqtt
最近由于iot越來越火, 物聯(lián)網(wǎng)的需求越來越多, 那么理所當然的使用mqtt的場景也就越來越多,
接下來是我使用springboot整合mqtt的過程, 以及踩過的一些坑.
mqtt服務(wù)器使用的是 EMQX, 官網(wǎng) : 這里
搭建的時候如果你使用的是集群 記得開放以下端口:

好了, 搭建成功下一步就是我們的java程序要與mqtt連接, 這里有兩種方式(其實不止兩種)進行連接.
一是 直接使用 MQTT Java 客戶端庫,詳情可以查看官方的例子: MQTT Java 客戶端 我就跳過了
二是使用 spring integration mqtt也是比較推薦的一種,也是我們主講這種.
第一步 添加 maven dependency
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>
第二步 添加配置
1 先寫好一些基本配置
mqtt:
username: test # 賬號
password: 123456 # 密碼
host-url: tcp://127.0.0.1:1883 # mqtt連接tcp地址
in-client-id: ${random.value} # 隨機值,使出入站 client ID 不同
out-client-id: ${random.value}
client-id: ${random.int} # 客戶端Id,不能相同,采用隨機數(shù) ${random.value}
default-topic: test/#,topic/+/+/up # 默認主題
timeout: 60 # 超時時間
keepalive: 60 # 保持連接
clearSession: true # 清除會話(設(shè)置為false,斷開連接,重連后使用原來的會話 保留訂閱的主題,能接收離線期間的消息)2.然后寫一個對應(yīng)的類MqttProperties
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* MqttProperties
*
* @author hengzi
* @date 2022/8/23
*/
@Component
public class MqttProperties {
/**
* 用戶名
*/
@Value("${mqtt.username}")
private String username;
/**
* 密碼
*/
@Value("${mqtt.password}")
private String password;
/**
* 連接地址
*/
@Value("${mqtt.host-url}")
private String hostUrl;
/**
* 進-客戶Id
*/
@Value("${mqtt.in-client-id}")
private String inClientId;
/**
* 出-客戶Id
*/
@Value("${mqtt.out-client-id}")
private String outClientId;
/**
* 客戶Id
*/
@Value("${mqtt.client-id}")
private String clientId;
/**
* 默認連接話題
*/
@Value("${mqtt.default-topic}")
private String defaultTopic;
/**
* 超時時間
*/
@Value("${mqtt.timeout}")
private int timeout;
/**
* 保持連接數(shù)
*/
@Value("${mqtt.keepalive}")
private int keepalive;
/**是否清除session*/
@Value("${mqtt.clearSession}")
private boolean clearSession;
// ...getter and setter
}接下來就是配置一些亂七八糟的東西, 這里有很多概念性的東西 比如 管道channel, 適配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起來是非常頭痛的
好吧,那就一個一個來,
首先連接mqtt需要一個客戶端, 那么我們就開一個客戶端工廠, 這里可以產(chǎn)生很多很多的客戶端
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getHostUrl().split(","));
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
然后再搞兩根管子(channel),一個出站,一個入站
//出站消息管道,
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
// 入站消息管道
@Bean
public MessageChannel mqttInboundChannel(){
return new DirectChannel();
}
為了使這些管子能流通 就需要一個適配器(adapter)
// Mqtt 管道適配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
}然后定義消息生產(chǎn)者
// 消息生產(chǎn)者
@Bean
public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//入站投遞的通道
adapter.setOutputChannel(mqttInboundChannel());
adapter.setQos(1);
return adapter;
}
那我們收到消息去哪里處理呢,答案是這里:
@Bean
//使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel,投遞到mqttInboundChannel管道中的消息會被該方法接收并執(zhí)行
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handleMessage() {
// 這個 mqttMessageHandle 其實就是一個 MessageHandler 的實現(xiàn)類(這個類我放下面)
return mqttMessageHandle;
// 你也可以這樣寫
// return new MessageHandler() {
// @Override
// public void handleMessage(Message<?> message) throws MessagingException {
// // do something
// }
// };
到這里我們其實已經(jīng)可以接受到來自mqtt的消息了
接下來配置向mqtt發(fā)送消息
配置 出站處理器
// 出站處理器
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
return handler;
}這個 出站處理器 在我看來就是讓別人 (MqttPahoMessageHandler)處理了, 我就不處理了,我只管我要發(fā)送什么,至于怎么發(fā)送,由MqttPahoMessageHandler來完成
接下來我們定義一個接口即可
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* MqttGateway
*
* @author hengzi
* @date 2022/8/23
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}我們直接調(diào)用這個接口就可以向mqtt 發(fā)送數(shù)據(jù)
到目前為止,整個配置文件長這樣:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MqttConfig
*
* @author hengzi
* @date 2022/8/23
*/
@Configuration
public class MqttConfig {
/**
* 以下屬性將在配置文件中讀取
**/
@Autowired
private MqttProperties mqttProperties;
//Mqtt 客戶端工廠
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getHostUrl().split(","));
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
// Mqtt 管道適配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
}
// 消息生產(chǎn)者
@Bean
public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//入站投遞的通道
adapter.setOutputChannel(mqttInboundChannel());
adapter.setQos(1);
return adapter;
}
// 出站處理器
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
return handler;
}
@Bean
//使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel,投遞到mqttInboundChannel管道中的消息會被該方法接收并執(zhí)行
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handleMessage() {
return mqttMessageHandle;
}
//出站消息管道,
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
// 入站消息管道
@Bean
public MessageChannel mqttInboundChannel(){
return new DirectChannel();
}
}處理消息的 MqttMessageHandle
@Component
public class MqttMessageHandle implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
}
}
在進一步了解之后,發(fā)現(xiàn)可以優(yōu)化的地方,比如channel 的類型是有很多種的, 這里使用的DirectChannel,是Spring Integration默認的消息通道,它將消息發(fā)送給為一個訂閱者,然后阻礙發(fā)送直到消息被接收,傳輸方式都是同步的方式,都是由一個線程來運行的.
這里我們可以將入站channel改成 ExecutorChannel一個可以使用多線程的channel
@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 最大可創(chuàng)建的線程數(shù)
int maxPoolSize = 200;
executor.setMaxPoolSize(maxPoolSize);
// 核心線程池大小
int corePoolSize = 50;
executor.setCorePoolSize(corePoolSize);
// 隊列最大長度
int queueCapacity = 1000;
executor.setQueueCapacity(queueCapacity);
// 線程池維護線程所允許的空閑時間
int keepAliveSeconds = 300;
executor.setKeepAliveSeconds(keepAliveSeconds);
// 線程池對拒絕任務(wù)(無線程可用)的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
// 入站消息管道
@Bean
public MessageChannel mqttInboundChannel(){
// 用線程池
return new ExecutorChannel(mqttThreadPoolTaskExecutor());
}到這里其實可以運行了.
但是這樣配置其實還是有點多, 有點亂, 于是我查找官網(wǎng), f發(fā)現(xiàn)一種更簡單的配置方法 叫 Java DSL, 官網(wǎng)連接: Configuring with the Java DSL
我們參考官網(wǎng),稍微改一下,使用 DSL的方式進行配置:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* MqttConfigV2
*
* @author hengzi
* @date 2022/8/24
*/
@Configuration
public class MqttConfigV2 {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttMessageHandle mqttMessageHandle;
//Mqtt 客戶端工廠 所有客戶端從這里產(chǎn)生
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getHostUrl().split(","));
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
// Mqtt 管道適配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
}
// 消息生產(chǎn)者 (接收,處理來自mqtt的消息)
@Bean
public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
return IntegrationFlows.from( adapter)
.channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
.handle(mqttMessageHandle)
.get();
}
@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 最大可創(chuàng)建的線程數(shù)
int maxPoolSize = 200;
executor.setMaxPoolSize(maxPoolSize);
// 核心線程池大小
int corePoolSize = 50;
executor.setCorePoolSize(corePoolSize);
// 隊列最大長度
int queueCapacity = 1000;
executor.setQueueCapacity(queueCapacity);
// 線程池維護線程所允許的空閑時間
int keepAliveSeconds = 300;
executor.setKeepAliveSeconds(keepAliveSeconds);
// 線程池對拒絕任務(wù)(無線程可用)的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
// 出站處理器 (向 mqtt 發(fā)送消息)
@Bean
public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
}
}這樣看起來真的簡單多了, 頭也沒那么大了, 我要是早知道多好.
好了以上就是配置相關(guān)的, 到這里其實是已經(jīng)完成springboot 與 mqtt 的整合了.
但其實我一直有個想法, 就是我們接收的消息 都是在 handleMessage這個方法里面執(zhí)行的,
@Override
public void handleMessage(Message<?> message) throws MessagingException {
}
所以我就有了一個想法, 能不能根據(jù) 我訂閱的主題,在不同的方法執(zhí)行, 對于這個問題,其實你用if ... else ...也能實現(xiàn), 但很明顯,如果我訂閱的主題很多的話, 那寫起來就很頭痛了.
對于這個問題,有兩種思路, 一個是添加Spring Integration的路由 router,根據(jù)不同topic路由到不同的channel, 這個我也知道能不能實現(xiàn), 我這里就不討論了.
第二種是, 我也不知道名字改如何叫, 我是參考了 spring的 @Controller的設(shè)計, 暫且叫他注解模式.
眾所周知,我們的接口都是在類上加 @Controller這個注解, 就代表這個類是 http 接口, 再在方法加上 @RequestMapping就能實現(xiàn)不同的 url 調(diào)用不同的方法.
參數(shù)這個設(shè)計 我們在類上面加 @MqttService就代表這個類是專門處理mqtt消息的服務(wù)類
同時 在這個類的方法上 加上 @MqttTopic就代表 這個主題由這個方法處理.
OK, 理論有了,接下來就是 實踐.
先定義 兩個注解
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {
@AliasFor(
annotation = Component.class
)
String value() default "";
}加上 @Component注解 spring就會掃描, 并注冊到IOC容器里
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {
/**
* 主題名字
*/
String value() default "";
}參考 @RequestMapping我們使用起來應(yīng)該是這樣的:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
/**
* MqttTopicHandle
*
* @author hengzi
* @date 2022/8/24
*/
@MqttService
public class MqttTopicHandle {
public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);
// 這里的 # 號是通配符
@MqttTopic("test/#")
public void test(Message<?> message){
log.info("test="+message.getPayload());
}
// 這里的 + 號是通配符
@MqttTopic("topic/+/+/up")
public void up(Message<?> message){
log.info("up="+message.getPayload());
}
// 注意 你必須先訂閱
@MqttTopic("topic/1/2/down")
public void down(Message<?> message){
log.info("down="+message.getPayload());
}
}OK 接下來就是實現(xiàn)這樣的使用
分析 :
當我們收到消息時, 我們從IOC容器中 找到所有 帶 @MqttService注解的類
然后 遍歷這些類, 找到帶有 @MqttTopic的方法
接著 把 @MqttTopic注解的的值 與 接受到的topic 進行對比
如果一致則執(zhí)行這個方法
廢話少說, 上代碼
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
/**
* MessageHandleService
*
* @author hengzi
* @date 2022/8/24
*/
@Component
public class MqttMessageHandle implements MessageHandler {
public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);
// 包含 @MqttService注解 的類(Component)
public static Map<String, Object> mqttServices;
/**
* 所有mqtt到達的消息都會在這里處理
* 要注意這個方法是在線程池里面運行的
* @param message message
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
getMqttTopicService(message);
}
public Map<String, Object> getMqttServices(){
if(mqttServices==null){
mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
}
return mqttServices;
}
public void getMqttTopicService(Message<?> message){
// 在這里 我們根據(jù)不同的 主題 分發(fā)不同的消息
String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
if(receivedTopic==null || "".equals(receivedTopic)){
return;
}
for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
// 把所有帶有 @MqttService 的類遍歷
Class<?> clazz = entry.getValue().getClass();
// 獲取他所有方法
Method[] methods = clazz.getDeclaredMethods();
for ( Method method: methods ){
if (method.isAnnotationPresent(MqttTopic.class)){
// 如果這個方法有 這個注解
MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
if(isMatch(receivedTopic,handleTopic.value())){
// 并且 這個 topic 匹配成功
try {
method.invoke(SpringUtils.getBean(clazz),message);
return;
} catch (IllegalAccessException e) {
e.printStackTrace();
log.error("代理炸了");
} catch (InvocationTargetException e) {
log.error("執(zhí)行 {} 方法出現(xiàn)錯誤",handleTopic.value(),e);
}
}
}
}
}
}
/**
* mqtt 訂閱的主題與我實際的主題是否匹配
* @param topic 是實際的主題
* @param pattern 是我訂閱的主題 可以是通配符模式
* @return 是否匹配
*/
public static boolean isMatch(String topic, String pattern){
if((topic==null) || (pattern==null) ){
return false;
}
if(topic.equals(pattern)){
// 完全相等是肯定匹配的
return true;
}
if("#".equals(pattern)){
// # 號代表所有主題 肯定匹配的
return true;
}
String[] splitTopic = topic.split("/");
String[] splitPattern = pattern.split("/");
boolean match = true;
// 如果包含 # 則只需要判斷 # 前面的
for (int i = 0; i < splitPattern.length; i++) {
if(!"#".equals(splitPattern[i])){
// 不是# 號 正常判斷
if(i>=splitTopic.length){
// 此時長度不相等 不匹配
match = false;
break;
}
if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
// 不相等 且不等于 +
match = false;
break;
}
}
else {
// 是# 號 肯定匹配的
break;
}
}
return match;
}
}工具類 SpringUtils
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* spring工具類 方便在非spring管理環(huán)境中獲取bean
*
*/
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware
{
/** Spring應(yīng)用上下文環(huán)境 */
private static ConfigurableListableBeanFactory beanFactory;
private static ApplicationContext applicationContext;
public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{
return beanFactory.getBeansWithAnnotation(clsName);
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
{
SpringUtils.beanFactory = beanFactory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
SpringUtils.applicationContext = applicationContext;
}
/**
* 獲取對象
*
* @param name
* @return Object 一個以所給名字注冊的bean的實例
* @throws org.springframework.beans.BeansException
*
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException
{
return (T) beanFactory.getBean(name);
}
/**
* 獲取類型為requiredType的對象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*
*/
public static <T> T getBean(Class<T> clz) throws BeansException
{
T result = (T) beanFactory.getBean(clz);
return result;
}
/**
* 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name)
{
return beanFactory.containsBean(name);
}
/**
* 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應(yīng)的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException)
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注冊對象的類型
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.getType(name);
}
/**
* 如果給定的bean名字在bean定義中有別名,則返回這些別名
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
{
return beanFactory.getAliases(name);
}
/**
* 獲取aop代理對象
*
* @param invoker
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy(T invoker)
{
return (T) AopContext.currentProxy();
}
/**
* 獲取當前的環(huán)境配置,無配置返回null
*
* @return 當前的環(huán)境配置
*/
public static String[] getActiveProfiles()
{
return applicationContext.getEnvironment().getActiveProfiles();
}
}OK, 大功告成. 終于舒服了, 終于不用寫if...else...了, 個人感覺這樣處理起來會更加優(yōu)雅. 寫代碼最重要是什么, 是優(yōu)雅~
以上!
參考文章:
附:
動態(tài)添加主題方式:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;
import java.util.Arrays;
/**
* MqttService
*
* @author hengzi
* @date 2022/8/25
*/
@Service
public class MqttService {
@Autowired
private MqttPahoMessageDrivenChannelAdapter adapter;
public void addTopic(String topic) {
addTopic(topic, 1);
}
public void addTopic(String topic,int qos) {
String[] topics = adapter.getTopic();
if(!Arrays.asList(topics).contains(topic)){
adapter.addTopic(topic,qos);
}
}
public void removeTopic(String topic) {
adapter.removeTopic(topic);
}
}
直接調(diào)用就行
總結(jié)
到此這篇關(guān)于springboot整合mqtt的文章就介紹到這了,更多相關(guān)springboot整合mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot集成Hadoop對HDFS的文件操作方法
這篇文章主要介紹了SpringBoot集成Hadoop對HDFS的文件操作方法,本文給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧2024-07-07
springboot結(jié)合redis實現(xiàn)搜索欄熱搜功能及文字過濾
本文主要介紹了springboot結(jié)合redis實現(xiàn)搜索欄熱搜功能及文字過濾,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
spring mvc利用ajax向controller傳遞對象的方法示例
這篇文章主要給大家介紹了關(guān)于spring mvc利用ajax向controller傳遞對象的相關(guān)資料,文中通過示例代碼將步驟介紹的非常詳細,對大家具有一定的參考學習價值,需要的朋友們下面來跟著小編一起學習學習吧。2017-07-07
Java如何實現(xiàn)支付寶電腦支付基于servlet版本
這篇文章主要介紹了Java如何實現(xiàn)支付寶電腦支付基于servlet版本,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-11-11
CountDownLatch和Atomic原子操作類源碼解析
這篇文章主要為大家介紹了CountDownLatch和Atomic原子操作類的源碼解析以及理解應(yīng)用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-03-03

