netty中的IO、NIO、AIO使用詳解
BIO——同步阻塞IO
看這個名稱大家可能會有點陌生,我們直接上例子:
服務(wù)端:
public static void main(String[] args) throws IOException {
//1.創(chuàng)建服務(wù)端Socket 并綁定端口
ServerSocket serverSocket = new ServerSocket(8080);
//2.等待客戶端連接 阻塞的
Socket accept = serverSocket.accept();
System.out.println(accept.getRemoteSocketAddress() + " 客戶端已連接");
//3.獲取輸入、輸出流
InputStream inputStream = accept.getInputStream();
OutputStream outputStream = accept.getOutputStream();
//4.接收客戶端信息
byte[] bytes = new byte[1024];
inputStream.read(bytes);
String data = new String(bytes);
System.out.println("來自" + accept.getRemoteSocketAddress() + "的信息:" + data);
//5.返回信息
outputStream.write(data.getBytes());
accept.shutdownOutput();
//6.關(guān)閉資源
inputStream.close();
outputStream.close();
accept.close();
serverSocket.close();
}客戶端:
public static void main(String[] args) throws IOException {
//1.創(chuàng)建客戶端Socket
Socket socket = new Socket("127.0.0.1",8080);
//2.獲取輸入、輸出流
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
//3.給服務(wù)端發(fā)送信息
outputStream.write("你好".getBytes());
socket.shutdownOutput();
//4.獲取服務(wù)端返回信息
byte[] data = new byte[1024];
inputStream.read(data);
System.out.println("來自服務(wù)端的信息:" + new String(data));
//6.關(guān)閉資源
inputStream.close();
outputStream.close();
socket.close();
}這就是我們熟知的Socket連接,也是Java最早的網(wǎng)絡(luò)通信IO,為什么這種叫同步阻塞IO:
因為在做read操作、accept操作的時候會阻塞沒法往下執(zhí)行,說白了就是串行的,就因為這個服務(wù)端和客戶端只能1對1通信,這合理嘛?肯定不合理啊,所以進(jìn)階的有了偽異步IO
偽異步阻塞IO
看完上面的,很多人就有想法了,你說同步的只能1對1通信,那我直接把服務(wù)端改成多線程版本不就好了嘛,不就可以1對多通信了嘛,沒錯這版本確實是這樣,如下:
服務(wù)端:
public static void main(String[] args) throws IOException {
//1.創(chuàng)建服務(wù)端Socket 并綁定端口
ServerSocket serverSocket = new ServerSocket(8080);
//2.等待客戶端連接 多線程模式 (開線程異步等待)
new Thread(()->{
while (true){
try {
Socket accept = serverSocket.accept();
System.out.println(accept.getRemoteSocketAddress() + " 客戶端已連接");
// 開線程異步處理客戶端連接任務(wù)
new Thread(new AcceptHandler(accept)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// 阻塞防止程序退出
while (true){}
}
private static class AcceptHandler implements Runnable{
private Socket accept;
private InputStream inputStream = null;
private OutputStream outputStream =null;
public AcceptHandler(Socket accept){
this.accept=accept;
}
@Override
public void run() {
try {
//3.獲取輸入、輸出流
inputStream = accept.getInputStream();
outputStream = accept.getOutputStream();
//4.接收客戶端信息
byte[] bytes = new byte[1024];
inputStream.read(bytes);
String data = new String(bytes);
if(data!=null){
System.out.println("來自" + accept.getRemoteSocketAddress() + "的信息:" + data);
//5.返回信息
outputStream.write(data.getBytes());
accept.shutdownOutput();
}
} catch (IOException e) {
System.out.println(accept.getRemoteSocketAddress() + "發(fā)送異常斷開連接");
closeSource();
}finally {
System.out.println(accept.getRemoteSocketAddress() + "斷開連接");
closeSource();
}
}
private void closeSource(){
//6.關(guān)閉資源
try {
if(inputStream!=null){inputStream.close();}
if(outputStream!=null){outputStream.close();}
accept.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}客戶端不變,服務(wù)端我們做了三個改動:
一:在等待客戶端連接的時候我們開啟一個線程,并死循環(huán)等待連接,這樣可以保證不阻塞主線程的運(yùn)行,同時可以不斷的和客戶端建立連接
二:和客戶端建立連接后又開啟一個線程來單獨(dú)處理與客戶端的通信
三:最后加了個死循環(huán)防止程序退出,因為現(xiàn)在是異步的了
這樣處理不就是異步的了嗎?為什么叫偽異步阻塞IO呢?
雖然現(xiàn)在不會阻塞主線程了,但是阻塞并沒有解決,該阻塞的地方依舊還是會阻塞,所以本質(zhì)上來說只是解決了1對1連接通信的問題
但是新的問題又來了,現(xiàn)在雖然是1對多通信,但是有一個客戶端連接就新建一個線程,1萬個客戶端就1萬個線程,這合理嗎?這明顯不合理啊,用線程池管理?那也不行啊,這連接一多還要排隊嗎?極端情況下,隊列不一樣會爆?
那怎么辦?有沒有可能一個線程監(jiān)聽多個連接呢?于是有了NIO
NIO——同步非阻塞IO
NIO的引入同時引入了三個概念ByteBuffer緩沖區(qū)、Channel通道和Selector多路復(fù)用器
- Channel的作用:就是一個通道,數(shù)據(jù)讀取和寫入的通道,根據(jù)功能可以分為不同的通道如:網(wǎng)絡(luò)通道ServerSocketChannel和SocketChannel、文件操作通道FileChannel等等
- Selector的作用:是輪詢Channel上面的事件,如讀事件、寫事件、連接事件、接受連接事件
- ByteBuffer緩沖區(qū):就是向Channel讀取或?qū)懭霐?shù)據(jù)的對象,本質(zhì)就是個字節(jié)數(shù)組
怎么理解這三個呢?說白了以傳統(tǒng)IO為例:服務(wù)端accept就是接受連接事件、客戶端connect就是連接事件、發(fā)送消息就是寫事件、讀取消息就是讀事件 Selector就是監(jiān)聽這些事件的工具 ServerSocketChannel是服務(wù)端接受連接的通道,所以只能注冊監(jiān)聽連接事件 SocketChannel是服務(wù)端與客戶端連接建立后的通道,所以可以注冊讀寫事件、連接事件 ByteBuffer就是Channel讀取或?qū)懭霐?shù)據(jù)的單位對象
下面搞個例子看看,注釋全有:
服務(wù)端:
public static void main(String[] args) throws IOException {
// 開啟服務(wù)端Socket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 設(shè)置為非阻塞
serverSocketChannel.configureBlocking(false);
// 綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
// 打開多路復(fù)用器 并將其注冊到通道上 監(jiān)聽連接請求事件
Selector selector = Selector.open();
// 為服務(wù)端Socket通道 注冊一個接受連接的事件
// 假設(shè)有客戶端要連接 下面輪詢的時候就會觸發(fā)這個事件 我們就可以去與客戶端建立連接了
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 這段時間沒獲取到任何事件,則跳過下面操作
// 不同于IO和BIO的阻塞 多路復(fù)用器會一直輪詢 如果長時間無事件 這里會一直空循環(huán)
// 所以這里在查詢事件的時候加了個時間 這樣無事件的情況下 1s才會循環(huán)一次
if (selector.select(1000) == 0) {
continue;
}
// 獲取到本次輪詢所獲取到的全部事件
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
// 輪詢獲取到的事件,并處理
while (selectorKeys.hasNext()) {
SelectionKey selectorKey = selectorKeys.next();
//這個已經(jīng)處理的事件Key一定要移除。如果不移除,就會一直存在在selector.selectedKeys集合中
//待到下一次selector.select() > 0時,這個Key又會被處理一次
selectorKeys.remove();
try {
// 事件key處理 也就是事件處理
selectorKeyHandler(selectorKey, selector);
} catch (Exception e) {
SocketChannel channel = (SocketChannel) selectorKey.channel();
System.out.println(channel.getRemoteAddress() + "客戶端已斷開連接");
if (selectorKey != null) {
selectorKey.cancel();
if (selectorKey.channel() != null) {
selectorKey.channel().close();
}
}
}
}
}
}
// 事件處理方法 按照事件類型處理不同的事件
public static void selectorKeyHandler(SelectionKey selectorKey, Selector selector) throws IOException {
// 連接事件 代表有客戶端連接 所以需要去處理這個連接請求
if (selectorKey.isAcceptable()) {
acceptHandler(selectorKey, selector);
}
// 讀事件 可以去讀取信息
if (selectorKey.isReadable()) {
readHandler(selectorKey, selector);
}
// 寫事件 可以向客戶端發(fā)送信息
if (selectorKey.isWritable()) {
SocketChannel socketChannel = (SocketChannel) selectorKey.channel();
writeHandler(socketChannel);
// 寫事件完成后要取消寫事件不然會一直寫 我這里就干脆注冊了個讀事件
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
// 連接事件處理 這個有客戶端要建立連接了 所以accept與客戶端建立連接
public static void acceptHandler(SelectionKey selectorKey, Selector selector) throws IOException {
ServerSocketChannel channel = (ServerSocketChannel) selectorKey.channel();
SocketChannel accept = channel.accept();
// 建立連接后 客戶端和服務(wù)端就等于形成了一個數(shù)據(jù)交互的通道 SocketChannel
// 這個通道也要設(shè)置為非阻塞
accept.configureBlocking(false);
// 為這個通道注冊一個讀事件 表示我先讀取客戶端信息
accept.register(selector, SelectionKey.OP_READ);
System.out.println(accept.getRemoteAddress() + "客戶端已連接");
}
// 讀事件處理 讀取客戶端的信息
public static void readHandler(SelectionKey selectorKey, Selector selector) throws IOException {
SocketChannel channel = (SocketChannel) selectorKey.channel();
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = channel.read(allocate);
if (read > 0) {
allocate.flip();
byte[] bytes = new byte[allocate.remaining()];
allocate.get(bytes);
System.out.println(channel.getRemoteAddress() + "發(fā)來消息:" + new String(bytes));
}
if(read<0){
System.out.println(channel.getRemoteAddress() + "斷開連接");
}
// 讀完信息后要給客戶端發(fā)送信息 所以這個再注冊一個寫的事件
channel.register(selector, SelectionKey.OP_WRITE);
}
// 寫事件處理
public static void writeHandler(SocketChannel socketChannel) throws IOException {
byte[] bytes = "你好".getBytes();
ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
allocate.put(bytes);
allocate.flip();
socketChannel.write(allocate);
}客戶端:
public static void main(String[] args) throws IOException {
// 開啟一個Socket通道
SocketChannel clientChannel = SocketChannel.open();
// 設(shè)置非阻塞
clientChannel.configureBlocking(false);
// 允許端口復(fù)用
clientChannel.socket().setReuseAddress(true);
// 連接地址
clientChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
// 開啟多路復(fù)用器
Selector selector = Selector.open();
// 為這個通道注冊一個連接事件
clientChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
// 這段時間沒獲取到任何事件,則跳過下面操作
// 不同于IO和BIO的阻塞 多路復(fù)用器會一直輪詢 如果長時間無事件 這里會一直空循環(huán)
// 所以這里在查詢事件的時候加了個時間 這樣無事件的情況下 1s才會循環(huán)一次
if (selector.select(1000) == 0) {
continue;
}
// 獲取到本次輪詢所獲取到的全部事件
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
// 輪詢獲取到的事件,并處理
while (selectorKeys.hasNext()) {
SelectionKey selectorKey = selectorKeys.next();
//這個已經(jīng)處理的事件Key一定要移除。如果不移除,就會一直存在在selector.selectedKeys集合中
//待到下一次selector.select() > 0時,這個Key又會被處理一次
selectorKeys.remove();
try {
// 事件key處理
selectorKeyHandler(selectorKey, selector);
} catch (Exception e) {
if (selectorKey != null) {
selectorKey.cancel();
if (selectorKey.channel() != null) {
selectorKey.channel().close();
}
}
}
}
}
}
// 事件處理方法
public static void selectorKeyHandler(SelectionKey selectorKey, Selector selector) throws IOException {
// 連接事件 判斷是否連接成功
if (selectorKey.isValid()) {
SocketChannel channel = (SocketChannel) selectorKey.channel();
if (selectorKey.isConnectable() && channel.finishConnect()) {
System.out.println("連接成功........");
// 連接成功注冊寫事件 向服務(wù)端發(fā)送信息
channel.register(selector,SelectionKey.OP_WRITE);
}
}
// 讀事件 可以去讀取信息
if (selectorKey.isReadable()) {
readHandler(selectorKey, selector);
}
// 寫事件 可以向客戶端發(fā)送信息
if (selectorKey.isWritable()) {
SocketChannel channel = (SocketChannel) selectorKey.channel();
writeHandler(channel);
// 寫事件完成后要取消寫事件不然會一直寫 我這里就干脆注冊了個讀事件
channel.register(selector,SelectionKey.OP_READ);
}
}
// 讀事件處理 就是處理服務(wù)端發(fā)來的消息
public static void readHandler(SelectionKey selectorKey, Selector selector) throws IOException {
SocketChannel channel = (SocketChannel) selectorKey.channel();
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = channel.read(allocate);
if (read > 0) {
allocate.flip();
byte[] bytes = new byte[allocate.remaining()];
allocate.get(bytes);
System.out.println("服務(wù)端發(fā)來消息:" + new String(bytes));
}
if(read<0){
System.out.println("與服務(wù)端斷開連接");
}
}
// 寫事件處理 就是像服務(wù)端發(fā)送消息
public static void writeHandler(SocketChannel socketChannel) throws IOException {
byte[] bytes = "你好".getBytes();
ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
allocate.put(bytes);
allocate.flip();
socketChannel.write(allocate);
}可以看到寫法和傳統(tǒng)的IO完全不一樣了,操作的對象都是Channel,讀寫對象都是ByteBuffer,那到底是什么引起了這種改變呢?因為系統(tǒng)內(nèi)核的優(yōu)化,說白了這種操作都是API,底層都是需要系統(tǒng)支持的,系統(tǒng)在這塊也有一個模型優(yōu)化,簡單介紹三種模型區(qū)別:
- select: 每有一個連接的產(chǎn)生會打開一個Socket描述符(下面簡稱FD),select會把這些FD保存在一個數(shù)組中,因為是數(shù)組所以就代表有了容量的上限意味了連接數(shù)量的上限,每次調(diào)用,都會遍歷這個數(shù)組,1w個連接就算只有一個事件,也會遍歷這1w個連接,效率極低
- poll: 和select不同,這個底層結(jié)構(gòu)是鏈表,所有沒了連接數(shù)量的上限,但是每次調(diào)用依舊會遍歷所有的
- epoll: 底層結(jié)構(gòu)是紅黑樹,同樣沒有連接數(shù)量的上限,而且有一個就緒的事件列表,這意味著不再需要遍歷所有的連接了
JDK中采用的就是epoll模型,但盡管這樣也依舊是同步的,因為還是需要主動去獲取結(jié)果,只是從方式阻塞等待變成了輪詢,有沒有什么方式在結(jié)果產(chǎn)生的時候異步的回調(diào)呢?于是有了AIO
AIO——異步IO
這種方式同樣需要系統(tǒng)的支持,目前主流還是NIO,這塊就不多介紹了,提供個例子:
服務(wù)端:
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
// 接收連接的時候 提供連接處理類
serverSocketChannel.accept(serverSocketChannel, new ServerSocketHandler());
// 異步的 防止程序退出
while (true) {
}
}
// 連接處理
public static class ServerSocketHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
@Override
public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
// 繼續(xù)接受連接
attachment.accept(attachment, this);
try {
System.out.println(result.getRemoteAddress() + " 已連接");
} catch (IOException e) {
e.printStackTrace();
}
new Thread(() -> {
// 異步讀
readHandler(result);
}).start();
// 寫數(shù)據(jù)處理
writeHandler(result, "你好");
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
System.out.println("發(fā)生異常");
}
public void readHandler(AsynchronousSocketChannel socketChannel) {
ByteBuffer allocate = ByteBuffer.allocate(1024);
socketChannel.read(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result > 0) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
System.out.println(socketChannel.getRemoteAddress() + " 客戶端消息: " + new String(bytes));
readHandler(socketChannel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println();
try {
System.out.println(socketChannel.getRemoteAddress() + " 已下線");
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void writeHandler(AsynchronousSocketChannel socketChannel, String data) {
byte[] bytes = data.getBytes();
ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
allocate.put(bytes);
allocate.flip();
socketChannel.write(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment.hasRemaining()) {
socketChannel.write(attachment, attachment, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}客戶端:
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel socketChannel=AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080), null, new AsyncClientHandler(socketChannel));
while (true){}
}
public static class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>{
private AsynchronousSocketChannel socketChannel;
public AsyncClientHandler(AsynchronousSocketChannel socketChannel){
this.socketChannel=socketChannel;
}
@Override
public void completed(Void result, AsyncClientHandler attachment) {
new Thread(()->{
// 異步 一秒發(fā)送一次消息
while (true){
writeHandler("你好");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 讀處理
readHandler();
}
@Override
public void failed(Throwable exc, AsyncClientHandler attachment) {
}
public void readHandler() {
ByteBuffer allocate = ByteBuffer.allocate(1024);
socketChannel.read(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
System.out.println(" 服務(wù)端消息: " + new String(bytes));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void writeHandler( String data) {
byte[] bytes = data.getBytes();
ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
allocate.put(bytes);
allocate.flip();
socketChannel.write(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment.hasRemaining()) {
socketChannel.write(attachment, attachment, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}總結(jié)
| BIO | 偽異步IO | NIO | AIO | |
| 線程:客戶端 | 1:1 | N:M (M可以大于N) | 1:N (一個線程處理多個) | 0:M (無需額外線程,異步回調(diào)) |
| I/O類型 | 同步阻塞 | 偽異步阻塞 | 同步非阻塞 | 異步非阻塞 |
| 可靠性 | 非常差 | 差 | 高 | 高 |
| 難度 | 簡單 | 簡單 | 復(fù)雜 | 復(fù)雜 |
| 性能 | 低 | 中 | 高 | 高 |
到此這篇關(guān)于netty中的IO、NIO、AIO使用詳解的文章就介紹到這了,更多相關(guān)netty的IO、NIO、AIO內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java inputstream和outputstream使用詳解
這篇文章主要介紹了Java inputstream和outputstream使用詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08
解決springboot文件配置端口不起作用(默認(rèn)8080)
這篇文章主要介紹了解決springboot文件配置端口不起作用(默認(rèn)8080),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Java函數(shù)式編程(三):列表的轉(zhuǎn)化
這篇文章主要介紹了Java函數(shù)式編程(二):列表的轉(zhuǎn)化,lambda表達(dá)式不僅能幫助我們遍歷集合,并且可以進(jìn)行集合的轉(zhuǎn)化,需要的朋友可以參考下2014-09-09

