hadoop client與datanode的通信協(xié)議分析
本文主要分析了hadoop客戶端read和write block的流程. 以及client和datanode通信的協(xié)議, 數(shù)據(jù)流格式等.
hadoop客戶端與namenode通信通過(guò)RPC協(xié)議, 但是client 與datanode通信并沒(méi)有使用RPC, 而是直接使用socket, 其中讀寫時(shí)的協(xié)議也不同, 本文分析了hadoop 0.20.2版本的(0.19版本也是一樣的)client與datanode通信的原理與通信協(xié)議. 另外需要強(qiáng)調(diào)的是0.23及以后的版本中client與datanode的通信協(xié)議有所變化, 使用了protobuf作為序列化方式.
Write block
1. 客戶端首先通過(guò)namenode.create, 向namenode請(qǐng)求創(chuàng)建文件, 然后啟動(dòng)dataStreamer線程
2. client包括三個(gè)線程, main線程負(fù)責(zé)把本地?cái)?shù)據(jù)讀入內(nèi)存, 并封裝為Package對(duì)象, 放到隊(duì)列dataQueue中.
3. dataStreamer線程檢測(cè)隊(duì)列dataQueue是否有package, 如果有, 則先創(chuàng)建BlockOutPutStream對(duì)象(一個(gè)block創(chuàng)建一次, 一個(gè)block可能包括多個(gè)package), 創(chuàng)建的時(shí)候會(huì)和相應(yīng)的datanode通信, 發(fā)送DATA_TRANSFER_HEADER信息并獲取返回. 然后創(chuàng)建ResponseProcessor線程, 負(fù)責(zé)接收datanode的返回ack確認(rèn)信息, 并進(jìn)行錯(cuò)誤處理.
4. dataStreamer從dataQueue中拿出Package對(duì)象, 發(fā)送給datanode. 然后繼續(xù)循環(huán)判斷dataQueue是否有數(shù)據(jù)…..
下圖展示了write block的流程.
下圖是報(bào)文的格式
Read block
主要在BlockReader類中實(shí)現(xiàn).
初始化newBlockReader時(shí),
1. 通過(guò)傳入?yún)?shù)sock創(chuàng)建new SocketOutputStream(socket, timeout), 然后寫通信信息, 與寫block的header不大一樣.
//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( genStamp );
out.writeLong( startOffset );
out.writeLong( len );
Text.writeString(out, clientName);
out.flush();
2. 創(chuàng)建輸入流 new SocketInputStream(socket, timeout)
3. 判斷返回消息 in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS
4. 根據(jù)輸入流創(chuàng)建checksum : DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. 讀取第一個(gè)Chunk的位置: long firstChunkOffset = in.readLong()
注: 512個(gè)字節(jié)為一個(gè)chunk計(jì)算checksum(4個(gè)字節(jié))
6. 接下來(lái)在BlockReader的read方法中讀取具體數(shù)據(jù): result = readBuffer(buf, off, realLen)
7. 一個(gè)一個(gè)chunk的讀取
int packetLen = in.readInt();
long offsetInBlock = in.readLong();
long seqno = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully(in, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully(in, buf, offset, chunkLen);
8. 讀取數(shù)據(jù)后checksum驗(yàn)證; FSInputChecker.verifySum(chunkPos)
相關(guān)文章
MyBatis通過(guò)JDBC數(shù)據(jù)驅(qū)動(dòng)生成的執(zhí)行語(yǔ)句問(wèn)題
這篇文章主要介紹了MyBatis通過(guò)JDBC數(shù)據(jù)驅(qū)動(dòng)生成的執(zhí)行語(yǔ)句問(wèn)題的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-08-08Spring?Data?Jpa返回自定義對(duì)象的3種方法實(shí)例
在使用Spring Data Jpa框架時(shí),根據(jù)業(yè)務(wù)需求我們通常需要進(jìn)行復(fù)雜的數(shù)據(jù)庫(kù)查詢,下面這篇文章主要給大家介紹了關(guān)于Spring?Data?Jpa返回自定義對(duì)象的3種方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08java組件commons-fileupload文件上傳示例
這篇文章主要為大家詳細(xì)介紹了java組件commons-fileupload實(shí)現(xiàn)文件上傳,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10SpringBoot啟動(dòng)后立即執(zhí)行的幾種方法小結(jié)
在項(xiàng)目開(kāi)發(fā)中某些場(chǎng)景必須要用到啟動(dòng)項(xiàng)目后立即執(zhí)行方式的功能,本文主要介紹了SpringBoot啟動(dòng)后立即執(zhí)行的幾種方法小結(jié),具有一定的參考價(jià)值,感興趣的可以了解一下2023-05-05Java中Dijkstra算法求解最短路徑的實(shí)現(xiàn)
Dijkstra算法是一種解決最短路徑問(wèn)題的常用算法,本文主要介紹了Java中Dijkstra算法求解最短路徑的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09Service層異常拋到Controller層處理還是直接處理問(wèn)題分析
這篇文章主要為大家介紹了Service層異常拋到Controller層處理還是直接處理的問(wèn)題分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09