RocketMQ中的通信模塊詳解
通信機制
RocketMQ消息隊列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4個角色,基本通訊流程如下:
(1) Broker啟動后需要完成一次將自己注冊至NameServer的操作;隨后每隔30s時間定時向NameServer上報Topic路由信息。
(2) 消息生產者Producer作為客戶端發(fā)送消息時候,需要根據(jù)消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從NameServer上重新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息。
(3) 消息生產者Producer根據(jù)2)中獲取的路由信息選擇一個隊列(MessageQueue)進行消息發(fā)送;Broker作為消息的接收者接收消息并落盤存儲。
(4) 消息消費者Consumer根據(jù)2)中獲取的路由信息,并再完成客戶端的負載均衡后,選擇其中的某一個或者某幾個消息隊列來拉取消息并進行消費。
從上面1)~3)中可以看出在消息生產者, Broker和NameServer之間都會發(fā)生通信(這里只說了MQ的部分通信),因此如何設計一個良好的網絡通信模塊在MQ中至關重要,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能。
rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網絡通信的模塊,它幾乎被其他所有需要網絡通信的模塊(諸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依賴和引用。為了實現(xiàn)客戶端與服務器之間高效的數(shù)據(jù)請求與接收,RocketMQ消息隊列自定義了通信協(xié)議并在Netty的基礎之上擴展了通信模塊。
Remoting通信類
通信類結構:

NettyRemotingServer
NettyRemotingServer為服務端實現(xiàn)類,在NamesrvController中被構造。
NettyRemotingServer構造時主要工作是初始化下列屬性,構造時判斷useEpoll來決定EventLoopGroup的實現(xiàn)。
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;
private final ExecutorService publicExecutor;
private final ChannelEventListener channelEventListener;NettyRemotingServer啟動
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//添加handler,握手、編解碼、idle檢測、連接管理、消息處理
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}主要關注initChannel方法中添加的handler,NettyConnectManageHandler
//消息處理核心類
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
//連接管理處理類
@ChannelHandler.Sharable
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
}到此這篇關于RocketMQ中的通信模塊詳解的文章就介紹到這了,更多相關RocketMQ通信模塊內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot結果封裝和異常攔截的實現(xiàn)示例
SpringBoot 項目中,我們通常需要將結果數(shù)據(jù)封裝成特定的格式,以方便客戶端進行處理,本文主要介紹了SpringBoot?優(yōu)雅的結果封裝和異常攔截,感興趣的可以了解一下2023-08-08
踩坑之spring事務,非事務方法與事務方法執(zhí)行相互調用方式
這篇文章主要介紹了踩坑之spring事務,非事務方法與事務方法執(zhí)行相互調用方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
Spring中ApplicationContextAware的使用方法詳解
ApplicationContextAware?通過它Spring容器會自動把上下文環(huán)境對象調用ApplicationContextAware接口中的setApplicationContext方法,這篇文章主要介紹了Spring中ApplicationContextAware的作用,需要的朋友可以參考下2023-03-03
Springboot四種事件監(jiān)聽的實現(xiàn)方式詳解
這篇文章主要介紹了Springboot四種事件監(jiān)聽的實現(xiàn)方式,事件監(jiān)聽是一種機制,可以定義和觸發(fā)自定義的事件,以及在應用程序中注冊監(jiān)聽器來響應這些事件,需要的朋友可以參考下2022-06-06
Java實現(xiàn)入?yún)?shù)據(jù)批量數(shù)據(jù)校驗詳解
在業(yè)務處理中一般入?yún)⑹菃螚l數(shù)據(jù),這樣數(shù)據(jù)校驗比較容易,但是這種方法對于集合數(shù)據(jù)的校驗不適用,下面我們就來看看如何對入?yún)?shù)據(jù)進行批量數(shù)據(jù)校驗吧2024-02-02

