SpringBoot整合Netty開發(fā)MQTT服務(wù)端
Netty認知
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)開發(fā)的網(wǎng)絡(luò)通信框架,相比傳統(tǒng)Socket,在并發(fā)性方面有著很大的提升。關(guān)于NIO,BIO,AIO之間的區(qū)別,可以參考這篇博客:Java中AIO、BIO、NIO應(yīng)用場景及區(qū)別
MQTT服務(wù)端實現(xiàn)
首先我們啟動一個tcp服務(wù),這里我用到了Redis與RabbitMQ,主要是與分布式WEB平臺之間好對接
@Component
public class ApplicationEventListener implements CommandLineRunner {
@Value("${spring.application.name}")
private String nodeName;
@Value("${gnss.mqttserver.tcpPort}")
private int tcpPort;
@Override
public void run(String... args) throws Exception {
//啟動TCP服務(wù)
startTcpServer();
//清除Redis所有此節(jié)點的在線終端
RedisService redisService = SpringBeanService.getBean(RedisService.class);
redisService.deleteAllOnlineTerminals(nodeName);
//將所有此節(jié)點的終端設(shè)置為離線
RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
messageSender.noticeAllOffline(nodeName);
}
/**
* 啟動TCP服務(wù)
*
* @throws Exception
*/
private void startTcpServer() throws Exception {
//計數(shù)器,必須等到所有服務(wù)啟動成功才能進行后續(xù)的操作
final CountDownLatch countDownLatch = new CountDownLatch(1);
//啟動TCP服務(wù)
TcpServer tcpServer = new TcpServer(tcpPort, ProtocolEnum.MqttCommon, countDownLatch);
tcpServer.start();
//等待啟動完成
countDownLatch.await();
}
}接下來我們編寫一個TcpServer類實現(xiàn)TCP服務(wù)
@Slf4j
public class TcpServer extends Thread{
private int port;
private ProtocolEnum protocolType;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap serverBootstrap = new ServerBootstrap();
private CountDownLatch countDownLatch;
public TcpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) {
this.port = port;
this.protocolType = protocolType;
this.countDownLatch = countDownLatch;
bossGroup = new NioEventLoopGroup(1);
workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class);
final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(MqttConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
ch.pipeline().addLast("decoder", new MqttDecoder());
ch.pipeline().addLast(executorGroup, MqttBusinessHandler.INSTANCE);
}
});
}
@Override
public void run() {
bind();
}
/**
* 綁定端口啟動服務(wù)
*/
private void bind() {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
log.info("{} MQTT服務(wù)器啟動,端口:{}", protocolType, port);
countDownLatch.countDown();
} else {
log.error("{} MQTT服務(wù)器啟動失敗,端口:{}", protocolType, port, future.cause());
System.exit(-1);
}
});
}
/**
* 關(guān)閉服務(wù)端
*/
public void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
log.info("{} TCP服務(wù)器關(guān)閉,端口:{}", protocolType, port);
}
}
編寫一個解碼器MqttBusinessHandler,實現(xiàn)對MQTT消息接收與處理
@Slf4j
@ChannelHandler.Sharable
public class MqttBusinessHandler extends SimpleChannelInboundHandler<Object> {
public static final MqttBusinessHandler INSTANCE = new MqttBusinessHandler();
private MqttMsgBack mqttMsgBack;
private MqttBusinessHandler() {
mqttMsgBack= MqttMsgBack.INSTANCE;
}
/**
* 接收到消息后處理
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (null != msg) {
MqttMessage mqttMessage = (MqttMessage) msg;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
Channel channel = ctx.channel();
if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
//在一個網(wǎng)絡(luò)連接上,客戶端只能發(fā)送一次CONNECT報文。服務(wù)端必須將客戶端發(fā)送的第二個CONNECT報文當(dāng)作協(xié)議違規(guī)處理并斷開客戶端的連接
//建議connect消息單獨處理,用來對客戶端進行認證管理等 這里直接返回一個CONNACK消息
mqttMsgBack.connectionAck(ctx, mqttMessage);
}
switch (mqttFixedHeader.messageType()){
//客戶端發(fā)布消息
case PUBLISH:
mqttMsgBack.publishAck(ctx, mqttMessage);
break;
//發(fā)布釋放
case PUBREL:
mqttMsgBack.publishComp(ctx, mqttMessage);
break;
//訂閱主題
case SUBSCRIBE:
mqttMsgBack.subscribeAck(ctx, mqttMessage);
break;
//取消訂閱主題
case UNSUBSCRIBE:
mqttMsgBack.unsubscribeAck(ctx, mqttMessage);
break;
//客戶端發(fā)送心跳報文
case PINGREQ:
mqttMsgBack.pingResp(ctx, mqttMessage);
break;
//客戶端主動斷開連接
case DISCONNECT:
break;
default:
break;
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("終端關(guān)閉連接,IP信息:{}", CommonUtil.getClientAddress(ctx));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.error("終端連接異常,IP信息:{}", CommonUtil.getClientAddress(ctx), cause);
}
/**
* 服務(wù)端當(dāng)讀超時時會調(diào)用這個方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
ctx.close();
log.error("讀超時,IP信息:{}", CommonUtil.getClientAddress(ctx), evt);
}我們對接收到的消息進行業(yè)務(wù)處理
@Slf4j
public class MqttMsgBack {
public static final MqttMsgBack INSTANCE = new MqttMsgBack();
private RedisService redisService;
private RabbitMessageSender messageSender;
private Environment environment;
private MessageServiceProvider messageServiceProvider;
private MqttMsgBack() {
redisService = SpringBeanService.getBean(RedisService.class);
messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
environment = SpringBeanService.getBean(Environment.class);
messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class);
}
/**
* 確認連接請求
* @param ctx
* @param mqttMessage
*/
public void connectionAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
//構(gòu)建返回報文, 可變報頭
MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
//構(gòu)建返回報文, 固定報頭
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
//構(gòu)建連接回復(fù)消息體
MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
ctx.writeAndFlush(connAck);
//獲取連接者的ClientId
String clientIdentifier = mqttConnectMessage.payload().clientIdentifier();
//查詢終端號碼有無在平臺注冊
TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(clientIdentifier);
if (terminalInfo == null) {
log.error("終端登錄失敗,未找到終端信息,終端號:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx));
ctx.close();
return;
}
//設(shè)置節(jié)點名
terminalInfo.setNodeName(environment.getProperty("spring.application.name"));
//保存終端信息和消息流水號到上下文屬性中
Session session = new Session(terminalInfo);
ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx);
if (oldCtx == null) {
log.info("終端登錄成功,終端ID:{},終端號:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx));
} else {
log.info("終端重復(fù)登錄關(guān)閉上一個連接,終端ID:{},終端號:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx));
oldCtx.close();
}
//通知上線
messageSender.noticeOnline(terminalInfo);
log.info("終端登錄成功,終端號:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx));
}
/**
* 根據(jù)qos發(fā)布確認
* @param ctx
* @param mqttMessage
*/
public void publishAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
//得到主題
String topicName = mqttPublishMessage.variableHeader().topicName();
//獲取消息體
ByteBuf msgBodyBuf = mqttPublishMessage.payload();
log.info("收到:{}", ByteBufUtil.hexDump(msgBodyBuf));
MqttCommonMessage msg=new MqttCommonMessage();
msg.setTerminalNum(SessionUtil.getTerminalInfo(ctx).getTerminalNum());
msg.setStrMsgId(topicName);
//根據(jù)主題獲取對應(yīng)的主題消息處理器
BaseMessageService messageService = messageServiceProvider.getMessageService(topicName);
try {
Object result = messageService.process(ctx, msg, msgBodyBuf);
log.info("收到{}({}),終端ID:{},內(nèi)容:{}", messageService.getDesc(), topicName,msg.getTerminalNum(), msg.getMsgBodyItems());
} catch (Exception e) {
log.error("收到{}({}),消息異常,終端ID:{},消息體:{}", messageService.getDesc(), topicName,msg.getTerminalNum(),ByteBufUtil.hexDump(msgBodyBuf), e);
}
switch (qos) {
//至多一次
case AT_MOST_ONCE:
break;
//至少一次
case AT_LEAST_ONCE:
//構(gòu)建返回報文, 可變報頭
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
//構(gòu)建返回報文, 固定報頭
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
//構(gòu)建PUBACK消息體
MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("Qos:AT_LEAST_ONCE:{}",pubAck.toString());
ctx.writeAndFlush(pubAck);
break;
//剛好一次
case EXACTLY_ONCE:
//構(gòu)建返回報文,固定報頭
MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
//構(gòu)建返回報文,可變報頭
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
log.info("Qos:EXACTLY_ONCE回復(fù):{}"+mqttMessageBack.toString());
ctx.writeAndFlush(mqttMessageBack);
break;
default:
break;
}
}
/**
* 發(fā)布完成 qos2
* @param ctx
* @param mqttMessage
*/
public void publishComp (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
//構(gòu)建返回報文, 固定報頭
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
//構(gòu)建返回報文, 可變報頭
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
log.info("發(fā)布完成回復(fù):{}"+mqttMessageBack.toString());
ctx.writeAndFlush(mqttMessageBack);
}
/**
* 訂閱確認
* @param ctx
* @param mqttMessage
*/
public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
//構(gòu)建返回報文, 可變報頭
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
for (int i = 0; i < topics.size(); i++) {
grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
}
// 構(gòu)建返回報文 有效負載
MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
// 構(gòu)建返回報文 固定報頭
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
// 構(gòu)建返回報文 訂閱確認
MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
log.info("訂閱回復(fù):{}", subAck.toString());
ctx.writeAndFlush(subAck);
}
/**
* 取消訂閱確認
* @param ctx
* @param mqttMessage
*/
public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 構(gòu)建返回報文 可變報頭
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
// 構(gòu)建返回報文 固定報頭
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
// 構(gòu)建返回報文 取消訂閱確認
MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
log.info("取消訂閱回復(fù):{}",unSubAck.toString());
ctx.writeAndFlush(unSubAck);
}
/**
* 心跳響應(yīng)
* @param ctx
* @param mqttMessage
*/
public void pingResp (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
log.info("心跳回復(fù):{}", mqttMessageBack.toString());
ctx.writeAndFlush(mqttMessageBack);
}
}我們可以根據(jù)客戶端發(fā)布消息的主題匹配不同的處理器

最后,我們在對應(yīng)的處理器里面實現(xiàn)對主題消息的處理邏輯,比如:定位消息,指令消息等等,比如簡單實現(xiàn)對定位數(shù)據(jù)Location主題的消息處理
@Slf4j
@MessageService(strMessageId = "Location", desc = "定位")
public class LocationMessageService extends BaseMessageService<MqttCommonMessage> {
@Autowired
private RabbitMessageSender messageSender;
@Override
public Object process(ChannelHandlerContext ctx, MqttCommonMessage msg, ByteBuf msgBodyBuf) throws Exception {
byte[] msgByteArr = new byte[msgBodyBuf.readableBytes()];
msgBodyBuf.readBytes(msgByteArr);
String data = new String(msgByteArr);
msg.putMessageBodyItem("位置", data);
return null;
}
}后續(xù)
目前僅僅是實現(xiàn)MQTT服務(wù)端消息接收與消息回復(fù),后續(xù)可以根據(jù)接入的物聯(lián)網(wǎng)設(shè)備進行對應(yīng)主題消息的業(yè)務(wù)處理
到此這篇關(guān)于SpringBoot整合Netty開發(fā)MQTT服務(wù)端的文章就介紹到這了,更多相關(guān)SpringBoot MQTT服務(wù)端內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot項目平滑關(guān)閉及自動化關(guān)閉腳本
這篇文章主要為大家詳細介紹了Springboot項目平滑關(guān)閉及自動化關(guān)閉腳本,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-05-05
Java并發(fā)Map面試線程安全數(shù)據(jù)結(jié)構(gòu)全面分析
本文將探討如何在Java中有效地應(yīng)對這些挑戰(zhàn),介紹一種強大的工具并發(fā)Map,它能夠幫助您管理多線程環(huán)境下的共享數(shù)據(jù),確保數(shù)據(jù)的一致性和高性能,深入了解Java中的并發(fā)Map實現(xiàn),包括ConcurrentHashMap和ConcurrentSkipListMap,及相關(guān)知識點2023-09-09
Java?實戰(zhàn)項目之學(xué)生信息管理系統(tǒng)的實現(xiàn)流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實現(xiàn)學(xué)生信息管理系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11
Java 中的 NoSuchMethodException 異常及解決思路(最新推薦)
NoSuchMethodException異常是Java中使用反射機制時常見的錯誤,它通常由方法名或參數(shù)不匹配、訪問權(quán)限問題、方法簽名不匹配等原因引發(fā),解決方法包括核實方法名及其參數(shù)類型、確認方法訪問權(quán)限、檢查方法簽名和重載問題、確保方法存在于正確的類中,感興趣的朋友一起看看吧2025-01-01
Java聊天室之實現(xiàn)接收和發(fā)送Socket
這篇文章主要為大家詳細介紹了Java簡易聊天室之實現(xiàn)接收和發(fā)送Socket功能,文中的示例代碼講解詳細,具有一定的借鑒價值,需要的可以了解一下2022-10-10
簡化API提升開發(fā)效率RestTemplate與HttpClient?OkHttp關(guān)系詳解
這篇文章主要為大家介紹了簡化API,提升開發(fā)效率,RestTemplate與HttpClient?OkHttp關(guān)系介紹,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10
Springboot集成JWT實現(xiàn)登錄注冊的示例代碼
本文主要介紹了Springboot集成JWT實現(xiàn)登錄注冊的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06

