Netty的心跳檢測解析
一、網(wǎng)絡(luò)連接假死現(xiàn)象
客戶端的心跳檢測對于任何長連接的應(yīng)用來說,都是一個非?;A(chǔ)的功能。
要理解心跳的重要性,首先需要從網(wǎng)絡(luò)連接假死的現(xiàn)象說起。
什么是連接假死呢?如果底層的TCP連接已經(jīng)斷開,但是服務(wù)器端并沒有正常地關(guān)閉套接字,認為這條連接仍然是存在的。
連接假死的具體表現(xiàn)如下:
- 在服務(wù)器端,會有一些處于TCP_ESTABLISHED狀態(tài)的正常連接
- 在客戶端,TCP客戶端已經(jīng)顯示連接已經(jīng)斷開
- 客戶端此時雖然可以進行斷線重連操作,但是上一次連接狀態(tài)依然被服務(wù)器端認為有效,并且服務(wù)器端的資源得不到正確釋放,包括套接字上下文以及接受/發(fā)送緩沖區(qū)
連接假死的情況雖然不常見,但是確實存在。服務(wù)器端長時間運行后,會面臨大量假死連接得不到釋放的情況。由于每個連接都會消耗CPU和內(nèi)存資源,因此大量假死的連接會逐漸耗光服務(wù)器的資源,使得服務(wù)器越來越慢,IO處理效率越來越低,最終導(dǎo)致服務(wù)器崩潰。
連接假死通常是由多個原因造成的:
- 應(yīng)用程序出現(xiàn)線程堵塞,無法進行連接的讀寫
- 網(wǎng)絡(luò)相關(guān)的設(shè)別出現(xiàn)故障
- 網(wǎng)絡(luò)丟包
解決假死的有效手段是客戶端定時進行心跳檢測,服務(wù)端定時進行空閑檢測。
二、服務(wù)器端的空閑檢測
想解決假死問題,服務(wù)器端的有效手段是空閑檢測。所謂空閑檢測就是每隔一段時間監(jiān)測子通道是否有數(shù)據(jù)讀寫,如果有則子通道是正常的,如果沒有則判定為假死,關(guān)閉子通道。
服務(wù)器端實現(xiàn)空閑檢測只需要使用Netty自帶的IdleStateHandler空閑狀態(tài)處理器就可以實現(xiàn)這個功能。
@Slf4j
public class HeartBeatServerHandler extends IdleStateHandler {
private static final int READ_IDLE_GAP = 150; // 最大空閑時間(s)
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{}秒內(nèi)未讀到數(shù)據(jù),關(guān)閉連接", READ_IDLE_GAP);
// 其他處理,如關(guān)閉會話
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判斷消息實例
if (!(msg instanceof MessageProtos.Message message)) {
super.channelRead(ctx, msg);
return;
}
if (message.getType() == MessageProtos.HeadType.HEART_BEAT) {
if (ctx.channel().isActive()) {
// 將心跳數(shù)據(jù)包直接回給客戶端
ctx.writeAndFlush(msg);
}
}
super.channelRead(ctx, msg);
}
}在HeartBeatServerHandler的構(gòu)造函數(shù)中,調(diào)用了基類IdleStateHandler的構(gòu)造函數(shù),傳遞了四個參數(shù):
- 入站空閑檢測時長:指的是一段時間內(nèi)如果沒有消息入站就判定為連接假死
- 出站空閑檢測時長:指的是一段時間內(nèi)如果沒有數(shù)據(jù)出站就判定為連接假死
- 出/入站檢測時長:表示在一段時間內(nèi)如果沒有出站或者入站就判定為連接假死
- 時間單位
判定為假死之后IdleStateHandler會回調(diào)自己的channelIdle()方法,一般在這個方法中去進行一些連接的關(guān)閉。
HeartBeatServerHandler實現(xiàn)的主要功能是空閑檢測,需要客戶端定時發(fā)送心跳數(shù)據(jù)包(或報文、消息)進行配合,而且客戶端發(fā)送心跳數(shù)據(jù)包的時間間隔需要遠遠小于服務(wù)器端的空閑檢測時間間隔。
收到客戶端的心跳數(shù)據(jù)包之后可以直接回復(fù)客戶端,讓客戶端也能進行類似的空閑檢測。由于IdleStateHandler本身是一個入站處理器,只需要重寫這個子類的channelRead方法,然后將心跳數(shù)據(jù)包直接寫回給客戶端即可。
如果HeartBeatServerHandler要重寫channelRead方法,一定要調(diào)用積累的channelRead方法,不然IdleStateHandler的入站空閑檢測會無效。
三、客戶端的心跳報文
與服務(wù)器端的空閑檢測相配合,客戶端需要定期發(fā)送數(shù)據(jù)包到服務(wù)器端,通常這個數(shù)據(jù)包稱為心跳數(shù)據(jù)包。
@Slf4j
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
// 心跳的時間間隔,單位為秒
private static final int HEART_BEAT_INTERVAL = 50;
// 在Handler業(yè)務(wù)處理器被加入到流水線時開始發(fā)送心跳數(shù)據(jù)包
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ClientSession session = ctx.channel().attr(ClientSession.CLIENT_SESSION).get();
MessageProtos.MessageHeartBeat heartBeat =
MessageProtos.MessageHeartBeat.newBuilder()
.setSeq(0)
.setJson("{\"from\":\"client\"}")
.setUid(session.getUserDTO().getUserId())
.build();
MessageProtos.Message message = MessageProtos.Message.newBuilder()
.setType(MessageProtos.HeadType.HEART_BEAT)
.setSessionId(session.getSessionId())
.setMessageHeartBeat(heartBeat)
.build();
heartBeat(ctx, message);
super.handlerAdded(ctx);
}
private void heartBeat(ChannelHandlerContext ctx, MessageProtos.Message message) {
// 提交在給定延遲后啟用的一次性任務(wù)。
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info("發(fā)送心跳消息給服務(wù)端");
ctx.writeAndFlush(message);
// 遞歸調(diào)用,發(fā)送下一次的心跳
heartBeat(ctx, message);
}
}, HEART_BEAT_INTERVAL, TimeUnit.SECONDS);
}
// 接收到服務(wù)器的心跳回寫
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof MessageProtos.Message message)) {
super.channelRead(ctx, msg);
return;
}
if (message.getType() == MessageProtos.HeadType.HEART_BEAT) {
log.info("收到會寫的心跳信息");
} else {
super.channelRead(ctx, msg);
}
}
}在HeartBeatClientHandler實例被加入到流水線時,它重寫的handlerAdded方法被回調(diào)。在handlerAdded方法中開始調(diào)用heartBeat方法發(fā)送心跳數(shù)據(jù)包。heartBeat是一個不斷遞歸調(diào)用的方法,它使用了ctx.executor()獲取當前通道綁定的Reactor反應(yīng)器NIO線,然后通過NIO現(xiàn)線程的schedule定時調(diào)度方法,在50s后觸發(fā)這個方法的執(zhí)行,再之后遞歸調(diào)用同樣延時50s后繼續(xù)發(fā)送。
客戶端的心跳間隔要比服務(wù)器端的空閑檢測時間間隔要短,一般來說要比它的一半要短一些,可以直接定義為空閑檢測時間間隔的1/3,以防止公網(wǎng)偶發(fā)的秒級抖動。
HeartBeatClientHandler實例并不是一開始就裝配到流水線中的,它裝配的實際實在登錄成功之后。
HeartBeatClientHandler實際上也可以集成IdleStateHandler類在客戶端進行空閑檢測,這樣客戶端也可以對服務(wù)器進行假死判定,在服務(wù)器假死的情況下,客戶端可以發(fā)起重連。
相關(guān)文章
如果淘寶的七天自動確認收貨讓你設(shè)計你用Java怎么實現(xiàn)
SpringCloud turbine監(jiān)控實現(xiàn)過程解析

