Netty中的心跳檢測機制詳解
Netty心跳檢測機制
1 心跳檢測使用場景
長連接的應用場景非常的廣泛,比如監(jiān)控系統(tǒng),IM系統(tǒng),即時報價系統(tǒng),推送服務等等。像這些場景都是比較注重實時性,如果每次發(fā)送數(shù)據(jù)都要進行一次DNS解析,建立連接的過程肯定是極其影響體驗。
而長連接的維護必然需要一套機制來控制。比如 HTTP/1.0 通過在 header 頭中添加 Connection:Keep-Alive參數(shù),如果當前請求需要?;顒t添加該參數(shù)作為標識,否則服務端就不會保持該連接的狀態(tài),發(fā)送完數(shù)據(jù)之后就關閉連接。HTTP/1.1以后 Keep-Alive 是默認打開的。
Netty 是 基于 TCP 協(xié)議開發(fā)的,在四層協(xié)議 TCP 協(xié)議的實現(xiàn)中也提供了 keepalive 報文用來探測對端是否可用。TCP 層將在定時時間到后發(fā)送相應的 KeepAlive 探針以確定連接可用性。
Netty 中提供了 tcp-keepalive 的設置:

.childOption(ChannelOption.SO_KEEPALIVE,true) 表示打開 TCP 的 keepAlive 設置。
2 Netty心跳檢測機制
Netty 中提供了 IdleStateHandler 類專門用于處理心跳。構造函數(shù)如下:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}參數(shù)說明:
- readerIdleTime 隔多久檢查一下讀事件是否發(fā)生,如果 channelRead() 方法超過 readerIdleTime 時間未被調用則會觸發(fā)超時事件調用 userEventTrigger() 方法
- writerIdleTime 隔多久檢查一下寫事件是否發(fā)生,如果 write() 方法超過 writerIdleTime 時間未被調用則會觸發(fā)超時事件調用 userEventTrigger() 方法;
- allIdleTime 隔多久檢查讀寫事件是否發(fā)生
- unit 時間單位
可以分別控制讀,寫,讀寫超時的時間,如果設置為0表示不檢測,所以如果全是0,則相當于沒添加這個 IdleStateHandler,連接是個普通的短連接。
2.1 代碼演示
服務端
public class TestHeartServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.handler(new LoggingHandler(LogLevel.INFO))//bossGroup處理handler
.childHandler(new ChannelInitializer<SocketChannel>() {//workergroup處理handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//每隔5s檢查一下是否有讀事件發(fā)生
pipeline.addLast(new IdleStateHandler(5,0,0, TimeUnit.SECONDS));
pipeline.addLast(new TestHeartServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(9999).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}服務端handler
public class TestHeartServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=(ByteBuf) msg;
System.out.println("客戶端消息:"+buf.toString(StandardCharsets.UTF_8));
//向客戶端發(fā)送消息
//ctx.writeAndFlush(Unpooled.copiedBuffer("heart",StandardCharsets.UTF_8));
}
/**
*如果5s沒有讀請求,則向客戶端發(fā)送心跳
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE: //讀空閑
//如果5s沒有讀請求,則向客戶端發(fā)送心跳
ctx.writeAndFlush("server send Heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
break;
case WRITER_IDLE://寫空閑
break;
case ALL_IDLE://讀寫空閑
break;
}
}
}客戶端
public class TestHeartClient {
public static void main(String[] args) {
EventLoopGroup eventExecutors=new NioEventLoopGroup();
try {
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//每隔4s檢查一下是否有寫事件
pipeline.addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
pipeline.addLast(new TestHeartClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
//向服務端發(fā)送消息
channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("Hello server, i'm online", StandardCharsets.UTF_8));
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}客戶端Handler
public class TestHeartClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=(ByteBuf) msg;
System.out.println("服務端發(fā)送的消息:"+buf.toString(StandardCharsets.UTF_8));
}
/**
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
//讀空閑
case READER_IDLE:
break;
case WRITER_IDLE://寫空閑
//如果4s沒有收到寫請求,則向服務端發(fā)送心跳請求
ctx.writeAndFlush(Unpooled.copiedBuffer("client send Heartbeat",StandardCharsets.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
break;
case ALL_IDLE://讀寫空閑
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}解釋一下代碼的邏輯:
服務端添加了:
Copypipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
每隔5s檢查一下是否有讀事件發(fā)生,如果沒有就觸發(fā) handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。
客戶端添加了:
Copynew IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)
每隔4s檢查一下是否有寫事件,如果沒有就觸發(fā) handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)邏輯。
到此這篇關于Netty中的心跳檢測機制詳解的文章就介紹到這了,更多相關Netty心跳檢測機制內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java多種方式實現(xiàn)生產(chǎn)者消費者模式
這篇文章主要介紹了Java多種方式實現(xiàn)生產(chǎn)者消費者模式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07
基于java web獲取網(wǎng)頁訪問次數(shù)代碼實例
這篇文章主要介紹了基于java web獲取網(wǎng)頁訪問次數(shù)代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02
Java編譯錯誤問題:需要class,interface或enum
這篇文章主要介紹了Java編譯錯誤問題:需要class,interface或enum,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02
通過prometheus監(jiān)控springboot程序運行狀態(tài)的操作流程
jmx_exporter用于從Java應用程序中提取JMX指標,適用于SpringBoot應用,通過下載jar包和配置文件,可以抓取JVM基礎指標,要獲取應用級別指標,需要集成Prometheus客戶端庫并自定義指標,本文給大家介紹了如何通過prometheus監(jiān)控springboot程序運行狀態(tài)2025-02-02

