RocketMQ獲取指定消息的實(shí)現(xiàn)方法(源碼)
概要
消息查詢是什么?
消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息
RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?
問(wèn)題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?
猜想1:逐個(gè)訪問(wèn)broker節(jié)點(diǎn)查詢數(shù)據(jù)
猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容
實(shí)際:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。
2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。
問(wèn)題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?
實(shí)際:?jiǎn)蝹€(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開(kāi)始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。
源碼閱讀
0.使用方式
MessageExt msg = consumer.viewMessage(msgId);
1.消息ID解析
這個(gè)了解下就可以了
public class MessageId { private SocketAddress address; private long offset; public MessageId(SocketAddress address, long offset) { this.address = address; this.offset = offset; } //get-set } //from MQAdminImpl.java public MessageExt viewMessage( String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MessageId messageId = null; try { //從msgId字符串中解析出address和offset //address = ip:port //offset為消息在CommitLog文件中的偏移量 messageId = MessageDecoder.decodeMessageId(msgId); } catch (Exception e) { throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); } return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), timeoutMillis); } //from MessageDecoder.java public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { SocketAddress address; long offset; //ipv4和ipv6的區(qū)別 //如果msgId總長(zhǎng)度超過(guò)32字符,則為ipv6 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2; byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength)); byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8)); ByteBuffer bb = ByteBuffer.wrap(port); int portInt = bb.getInt(0); address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); // offset byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16)); bb = ByteBuffer.wrap(data); offset = bb.getLong(0); return new MessageId(address, offset); }
2.長(zhǎng)連接客戶端RPC實(shí)現(xiàn)
要發(fā)請(qǐng)求首先得先建立連接,這里方法可以看到創(chuàng)建連接相關(guān)的操作。值得注意的是,第一次訪問(wèn)的時(shí)候可能連接還沒(méi)建立,建立連接需要消耗一段時(shí)間。代碼中對(duì)這個(gè)時(shí)間也做了判斷,如果連接建立完成后,發(fā)現(xiàn)已經(jīng)超時(shí),則不再發(fā)出請(qǐng)求。目的應(yīng)該是盡可能減少請(qǐng)求線程的阻塞時(shí)間。
//from NettyRemotingClient.java @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); //這里會(huì)先檢查有無(wú)該地址的通道,有則返回,無(wú)則創(chuàng)建 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { //前置鉤子 doBeforeRpcHooks(addr, request); //判斷通道建立完成時(shí)是否已到達(dá)超時(shí)時(shí)間,如果超時(shí)直接拋出異常。不發(fā)請(qǐng)求 long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } //同步調(diào)用 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); //后置鉤子 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鉤子 return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
下一步看看它的同步調(diào)用做了什么處理。注意到它會(huì)構(gòu)建一個(gè)Future對(duì)象加入待響應(yīng)池,發(fā)出請(qǐng)求報(bào)文后就掛起線程,然后等待喚醒(waitResponse內(nèi)部使用CountDownLatch等待)。
//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { //請(qǐng)求id final int opaque = request.getOpaque(); try { //請(qǐng)求存根 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); //加入待響應(yīng)的請(qǐng)求池 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //將請(qǐng)求發(fā)出,成功發(fā)出時(shí)更新?tīng)顟B(tài) channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { //若成功發(fā)出,更新請(qǐng)求狀態(tài)為“已發(fā)出” responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } //若發(fā)出失敗,則從池中移除(沒(méi)用了,釋放資源) responseTable.remove(opaque); responseFuture.setCause(f.cause()); //putResponse的時(shí)候會(huì)喚醒等待的線程 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); //只等待一段時(shí)間,不會(huì)一直等下去 //若正常響應(yīng),則收到響應(yīng)后,此線程會(huì)被喚醒,繼續(xù)執(zhí)行下去 //若超時(shí),則到達(dá)該時(shí)間后線程蘇醒,繼續(xù)執(zhí)行 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { //正常響應(yīng)完成時(shí),將future釋放(正常邏輯) //超時(shí)時(shí),將future釋放。這個(gè)請(qǐng)求已經(jīng)作廢了,后面如果再收到響應(yīng),就可以直接丟棄了(由于找不到相關(guān)的響應(yīng)鉤子,就不處理了) this.responseTable.remove(opaque); } }
好,我們?cè)賮?lái)看看收到報(bào)文的時(shí)候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個(gè)任務(wù)提交給其他線程處理,該線程處理完畢后會(huì)將結(jié)果寫(xiě)入到Future對(duì)象中,寫(xiě)入時(shí)如果有線程在等待該結(jié)果,則喚醒這些線程。這里也差不多,只不過(guò)執(zhí)行線程在服務(wù)端,服務(wù)執(zhí)行完畢后會(huì)將結(jié)果通過(guò)長(zhǎng)連接發(fā)送給客戶端,客戶端收到后根據(jù)報(bào)文中的ID信息從待響應(yīng)池中找到Future對(duì)象,然后就是類似的處理了。
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { //底層解碼完畢得到RemotingCommand的報(bào)文 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { //判斷類型 switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { //取得消息id final int opaque = cmd.getOpaque(); //從待響應(yīng)池中取得對(duì)應(yīng)請(qǐng)求 final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { //將響應(yīng)值注入到ResponseFuture對(duì)象中,等待線程可從這個(gè)對(duì)象獲取結(jié)果 responseFuture.setResponseCommand(cmd); //請(qǐng)求已處理完畢,釋放該請(qǐng)求 responseTable.remove(opaque); //如果有回調(diào)函數(shù)的話則回調(diào)(由當(dāng)前線程處理) if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { //沒(méi)有的話,則喚醒等待線程(由等待線程做處理) responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
總結(jié)一下,客戶端的處理時(shí)序大概是這樣的:
結(jié)構(gòu)大概是這樣的:
3.服務(wù)端的處理
//todo 服務(wù)端待補(bǔ)充CommitLog文件映射相關(guān)內(nèi)容
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } //from NettyRemotingAbscract.java public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //服務(wù)端走這里 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } //from NettyRemotingAbscract.java public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //查看有無(wú)該請(qǐng)求code相關(guān)的處理器 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); //如果沒(méi)有,則使用默認(rèn)處理器(可能沒(méi)有默認(rèn)處理器) final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { //不為null,則由本類將響應(yīng)值寫(xiě)會(huì)給請(qǐng)求方 response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { //為null,意味著processor內(nèi)部已經(jīng)將響應(yīng)處理了,這里無(wú)需再處理。 } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor為異步處理器 AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } } //from QueryMessageProcesor.java @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.QUERY_MESSAGE: return this.queryMessage(ctx, request); case RequestCode.VIEW_MESSAGE_BY_ID: //通過(guò)msgId查詢消息 return this.viewMessageById(ctx, request); default: break; } return null; } public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ViewMessageRequestHeader requestHeader = (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); response.setOpaque(request.getOpaque()); //getMessagetStore得到當(dāng)前映射到內(nèi)存中的CommitLog文件,然后根據(jù)偏移量取得數(shù)據(jù) final SelectMappedBufferResult selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); if (selectMappedBufferResult != null) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); //將響應(yīng)通過(guò)socket寫(xiě)回給客戶端 try { //response對(duì)象的數(shù)據(jù)作為header //消息內(nèi)容作為body FileRegion fileRegion = new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), selectMappedBufferResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { log.error("Transfer one message from page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); selectMappedBufferResult.release(); } return null; //如果有值,則直接寫(xiě)回給請(qǐng)求方。這里返回null是不需要由外層處理響應(yīng)。 } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); } return response; }
總結(jié)
到此這篇關(guān)于RocketMQ獲取指定消息的文章就介紹到這了,更多相關(guān)RocketMQ獲取指定消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)Img與PDF相互轉(zhuǎn)換
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)Img與PDF相互轉(zhuǎn)換的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05Springboot教程之如何設(shè)置springboot熱重啟
這篇文章主要介紹了Springboot教程之如何設(shè)置springboot熱重啟,本文通過(guò)實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07SpringBoot多文件分布式上傳功能實(shí)現(xiàn)
本文詳細(xì)介紹了如何在SpringBoot中實(shí)現(xiàn)多文件分布式上傳,并用代碼給出了相應(yīng)的實(shí)現(xiàn)思路和實(shí)現(xiàn)步驟,感興趣的朋友跟隨小編一起看看吧2023-06-06ocp開(kāi)閉原則_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了ocp開(kāi)閉原則的相關(guān)資料,ocp開(kāi)閉原則指導(dǎo)我們?nèi)绾谓⒁粋€(gè)穩(wěn)定的、靈活的系統(tǒng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08解決springboot+activemq啟動(dòng)報(bào)注解錯(cuò)誤的問(wèn)題
這篇文章主要介紹了解決springboot+activemq啟動(dòng)報(bào)注解錯(cuò)誤的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07SpringMVC利用dropzone組件實(shí)現(xiàn)圖片上傳
這篇文章主要介紹了SpringMVC利用dropzone組件實(shí)現(xiàn)圖片上傳,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02