# 非阻塞式IO通信

## 一、简介

非阻塞IO（NIO）弥补了原来同步阻塞IO的不足，NIO有三个重要概念：

* 缓冲区Buffer：缓冲待读写处理的数据，NIO是读写数据操作的就是Buffer；
* 通道Channel：数据通过的双向通道；
* 多路复用器Selector：负责多路复用；

## 二、NIO服务端&客户端流程

下面是NIO服务端的时序图：

![](/files/-M7h5drW_IsFCxmcQ3Ei)

1. 打开ServerSocketChannel，用于监听客户端的连接，它是所有客户端连接的父管道（对应BIO的ServerSocket）；
2. 绑定监听端口，设置连接方式为非阻塞模式；
3. 创建Reactor线程，创建多路复用器Selector并启动线程；
4. 将ServerSocketChannel注册到Reactor线程的多路复用器Selector上，监听ACCEPT事件；
5. 多路复用器在线程run方法的无限循环体内轮询准备就绪的Key；
6. 多路复用器监听到有新的客户端接入，处理新的接入请求，完成TCP三次握手，建立物理链路；
7. 设置客户端链路为非阻塞模型；
8. 将新接入的客户端连接注册到Reactor线程的多路复用器上，监听读操作，读取客户端发送的网络消息；
9. 异步读取客户端消息到缓冲区；
10. 对ByteBuffer进行编解码，如果有半包消息指针reset，继续读取后续的报文，将解码成功的消息封装成Task，投递到业务线程池中，进行业务逻辑编排；
11. 将POJO对象encode成ByteBuffer，调用SocketChannel的异步write接口，将消息异步发送给客户端。

> 注意：如果发送区TCP缓冲区满，会导致写半包，此时，需要注册监听写操作位，循环写，直到整包消息写入TCP缓冲区。

下面是NIO客户端的时序图：

![](/files/-M7h5drYZ8PxR_fOL7Z7)

1. 打开SocketChannel，绑定客户端本地地址（可选，默认系统会随机分配一个可用的本地地址）；
2. 设置SocketChannel为非阻塞模式，同时设置客户端连接的TCP参数；
3. 异步连接服务端；
4. 判断是否连接成功，如果连接成功则直接注册读状态位到多路复用器中（步骤10），如果当前没有连接成功（异步连接，返回false，说明客户端已经发送sync包，服务端没有返回ack包，物理链路还没有建立），向多路复用器注册连接状态位（步骤5）；
5. 向Reactor线程的多路复用器注册OP\_CONNECT状态位，监听服务端的TCP ACK应答；
6. 创建Reactor线程，创建多路复用器并启动线程；
7. 多路复用器在线程run方法的无限循环体内轮询准备就绪的key；
8. 接收connect事件并进行处理；
9. 判断连接是否完成，如果完成执行步骤10；
10. 注册读事件到多路复用器；
11. 异步读客户端请求到缓冲区；
12. 对ByteBuffer进行编解码，如果有半包消息接收缓冲区Reset，继续读取后续的报文，将解码成功的消息封装成Task，投递到业务线程池中，进行业务逻辑编排。
13. 将POJO对象encode成ByteBuffer，调用SocketChannel的异步write接口，将消息异步发送给客户端。

## 三、代码实例

### 3.1 服务端

TimeServer.java

```java
package com.wangjun.io.nio;
/**
 * @author wangjun
 * @date 2020-03-30
 * @version 1.0
 */
public class TimeServer {

    public static void main(String[] args) {
        int port = 8080;

        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO_MultiplexerTimeServer-001").start();
    }

}
```

MultiplexerTimeServer.java

```java
package com.wangjun.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

/**
 * @author wangjun
 * @date 2020-03-30
 * @version 1.0
 */
public class MultiplexerTimeServer implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;

    /**
     * 初始化多路复用器，绑定监听端口
     * 
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);//???1024这个参数什么作用
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The nio time server start in port:" + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                //休眠时间为1秒，无论是否有读写事件，selector每个1s被唤醒一次
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
        //多路复用器关闭后，所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭，所有不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("时间服务器收到的命令是:" + body);
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                            ? new Date(System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节，忽略
                }
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        if(response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}
```

### 3.2 客户端

TimeClient.java

```java
package com.wangjun.io.nio;

/**
 * @author wangjun
 * @date 2020-03-30
 * @version 1.0
 */
public class TimeClient {

    public static void main(String[] args) {
        int port = 8080;
        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
    }

}
```

TimeClientHandle.java

```java
package com.wangjun.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author wangjun
 * @date 2020-03-30
 * @version 1.0
 */
public class TimeClientHandle implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while(!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
        //多路复用器关闭后，所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭，所有不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 判断连接是否成功
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()) {
                if(sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else {
                    System.exit(1);
                }
            }

            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("现在的时间是:" + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节，忽略
                }
            }
        }
    }

    private void doConnect() throws IOException {
        //如果直连成功，则注册到多路复用器上，发送请求消息，读应答
        if(socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()) {
            System.out.println("send order to server succeed!");
        }
    }
}
```

### 3.3 运行服务端和客户端

服务端运行结果：

```
The nio time server start in port:8080
时间服务器收到的命令是:QUERY TIME ORDER
```

客户端运行结果：

```
send order to server succeed!
现在的时间是:Tue Mar 31 10:13:47 CST 2020
```

## 四、NIO的好处

1. 客户端发起的链接操作是异步的，可以通过多路复用器注册OP\_CONNECT等待后续结果，不需要像之前的客户端那样被同步阻塞；
2. SocketChannel的读写操作都是异步的，如果没有可读写的数据它不会同步等待，直接返回，这样IO通信线程就可以处理其他的链路，不需要同步等待这个链路可用；
3. 线程模型的优化：由于JDK的Selector在Linux等主流操作系统上通过epoll实现，它没有连接句柄数的限制（只受限于操作系统的最大句柄数或者单个进程的句柄数限制），这意味着一个Selector线程可以同时处理成千上万个客户端连接，而且性能不会随着客户端的增加而性能下降。因此，它非常适合做高性能、高负载的网络服务器。

> 参考：
>
> 《Netty权威指南》


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://jun-wang.gitbook.io/learnjava/ji-shu-xue-xi/javaio/fei-zu-se-shi-io-tong-xin.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
