Java NIO 与 Selector

本篇总结一下 Java NIO相关的知识点与源码

Java NIO 属于比较基础的 Java 知识点, 但是出于一些不可名状的原因, Java NIO 成了长期没有关注的盲点. 由于与 netty 息息相关, 在不远的未来将会用到, 所以在这里特意总结一下.

Channels 与 Buffers

在 Java 标准的输入输出控制时, 我们往往控制的是字节流或者字符流. 由于要满足 NIO (Non-block IO) 的需求, 所以我们需要使用 channels 和 buffers.

channel 和流提供的功能相类似, 但是区别在于:

i. channel 既可以用于读也可以用于写, 但是流一般只用于读或者写.

ii. channel 可以异步的读写, 这是 NIO 最大的要求

iii. channel 总是往 Buffer 中读, 或者从 Buffer 中写.

channel 有四种具体的实现:

i. FileChannel 主要用于传输文件流

ii. DatagramChannel 主要用于传输 UDP 报文

iii. SocketChannel 主要用于传输 TCP 套接字

iv. ServerSocketChannel 主要用于监听 TCP 连接, 并且生成 SocketChannel

以文件流为例, 将上述代码按照字节读取并打印出来:

RandomAccessFile file = new RandomAccessFile("java_notes_9.txt","rw");

FileChannel channel = file.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(1024);

int bytesRead = channel.read(buffer);

while(bytesRead != -1){

System.out.println("Read " + bytesRead);

buffer.flip();

while(buffer.hasRemaining()){

System.out.print((char) buffer.get());

}

buffer.clear();

bytesRead = channel.read(buffer);

}

file.close();

在上面的代码中, Buffer 与 Channels 进行交互, 数据是往 Buffer 中读, 或者从 Buffer 中写.

读写一般遵循如下流程:

  1. 将数据写入 Buffer

  2. 调用 Buffer.flip()

  3. 数据从 Buffer 中读取出来

  4. 调用 Buffer.clear() 或者 Buffer.compact()

为了解释这些流程首先需要知道 Buffer 的数据结构, Buffer 有三个重要的变量

  1. position

  2. limit

  3. capacity

position 描述了当前读/写的位置, 取值范围为 (0, capacity – 1), limit 是当前允许可读/可写的最大位置, capacity 描述了 Buffer 的最大大小.

值得注意的是, 在写模式下 limit = capacity, 当由写模式 通过 flip() 转换为读模式时, 新的 limit 会被缩短到原写模式下最后停留的 position , 新的 poistion 被置为 0. 最多能读取到之前写入的那么多的数据

当由读模式通过 clear() 转化为写模式时, position 会被置为 0, limit 会被置为 capacity-1. 当由读模式通过 compact() 转化为写模式时, 会将未读的数据 (position ~ limit部分的数据) 自动复制到 Buffer 的初始位置, 并且会把 position 设置为这段数据之后 (limit – position), limit 会被设置为 capacity , 写入数据会从新的 position 开始. 原先保留的数据会在下一次 flip() 时重新被读到.

除了 flip(), clear(). compact(), Buffer 还有一些方法可能会被用到, 包括:

rewind(), 将 position 重新设置为 0

mark()/reset(), 通过 mark() 可以记录下当前的 position , 通过 reset() 可以将位置复位到 mark() 的位置

Channel/Buffer 实现了 Scatter 和 Gather 的功能, 在处理类似于 http 报文的字符流时, 可以这样使用:

ByteBuffer header = ByteBuffer.allocate(128);

ByteBuffer body = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

channel.read(bufferArray);

一个 channel 同时读入多个 Buffer, 一个 Buffer 读入完了才会读下一个 Buffer. 这种情况下的 Buffer 都是固定大小, 不适合处理动态大小的数据. 这个特性叫 Scatter. (Buffer 由空到满)

当使用 channel.write(bufferArray), bufferArray 中从 positon 到 limit 的所有内容会被写入 channel, 这种情况下 bufferArray 是动态的大小. 这个特性叫做 Gather. (Buffer 由满到空)

Selector

Selector 是 Java NIO 的核心, 因为可以使用单一的线程维护一个 Seletctor 从而可以维护多个 Channel. 一般使用 open() 方法完成对 Selecor 的初始化.

Selector selector = Selector.open();

open() 方法相当于是一个静态工厂方法, 会返回一个 SelectorProvider, 在底层由各个平台对应的 jdk 实现, 在 windows 中是 WindowsSelectorProvider.

使用 register 方法可以注册 channel, channel 可以注册为 Connect, Accept, Read, Write.

因此,已成功连接到另一台服务器的通道是 “connect” 状态, 接受传入连接的服务器套接字通道是 “accept” 状态, 准备好要读取的数据的通道是 “read”状态, 准备好为您写入数据的通道是 “write” 状态. 例如, 我们可以注册一个读的 channel

SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT);

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

SocketChannel clientChannel = ssc.accept();

ssc.configureBlocking(false);

当然, 如果对不只一个事件感兴趣, 也可以使用多个标记位监听多个感兴趣的事件.

int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE;

当注册了多个 channel 之后, 可以采用 Selector.select() 方法

int select();

int select(long timeout);

int selectNow();

select() 会产生阻塞直到至少一个 channel 准备好了注册的事件, 并返回准备好的 channel, 也可以传入延时, 或者无论也没有直接返回结果.

我们以 Linux 系统为例, 一次 IO 访问的读操作中, 报文一旦进入网卡驱动就会产生中断, 内核会把报文送入协议栈, 完成网络协议的自底向上的转换. 传递到 socket 的内核缓冲区, 用户进程会调用 recv 读接口将报文读出来.

接下来, 通过设置 socket 为 Non-blocking, 当对 Non-blocking 的 socket 读时, 如果数据并没有准备好, 会立刻返回 error. 用户通过轮询读, 并且判断是不是返回 error 或者是返回了一个 system call, 那么将数据拷贝到用户内存.

那么在 WindowsSelectorProvider 中, select() 的关键代码是 doSelector 中 subSelector.poll() 函数:

try {

...

begin();

try {

subSelector.poll();

} catch (IOException e) {

finishLock.setException(e); // Save this exception

}

// Main thread is out of poll(). Wakeup others and wait for them

if (threads.size() > 0)

finishLock.waitForHelperThreads();

} finally {

end();

}

...

}

poll() 会涉及到(笔者环境为windows)操作系统下的 poll 操作. 这涉及到了操作系统底层的 I/O 多路复用的知识.

IO 多路复用是指内核一旦发现一个进程指定的一个或者多个 IO 条件准备读取, 它就通知该进程, IO 多路复用适用于:

  1. 客户处理多个描述字 2. 一个客户同时处理多个套接口 3. 一个 TCP 服务器既要处理监听套接口, 又要处理已连接套接口 4. 一个服务器既要处理 TCP, 又要处理 UDP 5. 一个服务器要处理多个协议与服务.

IO 多路复用的实质和 Java NIO 一致的, 或者说 Java NIO 底层就是用 操作系统 IO 多路复用实现, selector 的概念也是由 select 借鉴而来.

在源码中和在操作系统中类似, 多路复用通过一种机制一个进程能同时等待多个文件描述符, 而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态, select()函数就可以返回, IO多路复用中, 对于每个socket都是非阻塞式的, 但是由于用户进程需要调用 Selector 的 select 方法, 如果所有注册在 selector 上的 socket 都 block 了(没有数据到达), 整个用户进程还是block.

select 函数监视的文件描述符分3类, 分别是writefds, readfds, exceptfds. 调用后select函数会阻塞, 直到有socket描述符就绪(有数据 可读, 可写, 或者有except), 或者超时(timeout指定等待时间,如果立即返回设为null即可), 函数返回. 当select函数返回后, 可以通过遍历 fdset, 来找到就绪的描述符. 在于单个进程能够监视的文件描述符的数量存在最大限制, 在Linux上一般为1024, 可以通过修改宏定义甚至重新编译内核的方式提升这一限制, 但是这样也会造成效率的降低.

与select不同, poll使用一个 pollfd的指针实现. pollfd 结构包含了要监视的 event 和发生的 event, 不再使用select”参数-值”传递的方式. 同时, pollfd并没有最大数量限制(但是数量过大后性能也是会下降). 和select函数一样, poll返回后, 需要轮询 pollfd 来获取就绪的描述符.

select 和 poll 都需要在返回后, 通过遍历文件描述符来获取已经就绪的socket. 事实上, 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

相对于select和poll来说, epoll更加灵活, 没有 1024 个描述符限制. epoll使用一个文件描述符管理多个描述符, 将用户关系的文件描述符的事件存放到内核的一个事件表中, 这样在用户空间和内核空间的copy只需一次.

 

在 select/poll中, 进程只有在调用一定的方法后, 内核才对所有监视的文件描述符进行扫描, 而epoll事先通过 epoll_ctl() 来注册一个文件描述符, 一旦基于某个文件描述符就绪时, 内核会采用类似 callback 的回调机制(而不是再是轮询), 迅速激活这个文件描述符, 当进程调用 epoll_wait() 时便得到通知.

在使用完 slelect() 方法之后, 可以调用 selectedKeys() 方法获取其入口. 例如:

selector.select();

Set<selectionkey> keys = selector.selectedKeys();

Iterator</selectionkey><selectionkey> keyIterator = keys.iterator();

while (keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

keyIterator.remove();

if (key.isConnectable()) {

...

//do something

} else if (key.isWritable()) {

...

//do something

} else if (key.isReadable()){

...

//do something

}

}

</selectionkey>

通过获取其迭代器, 从而在不同的 channel 之间切换.

其余的 write()/read()/accept() 都由各自的 ServiceChannelImpl 实现. 是由底层的 IOUtil 的读写完成的, 不再深究

例如下面贴出的代码, 可以完成一个客户端实现两个线程反复与同一个服务器沟通, 服务器将发给其的内容返回给客户端.

NIOClient.java

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.*;

import java.util.concurrent.locks.ReentrantLock;

/**

* @author rancho.ws

* @date 2018/7/12

*/

public class NIOClient {

public static void main(String[] args) throws IOException {

System.out.println("Client started...");

Selector selector = Selector.open();

ReentrantLock lock = new ReentrantLock();

for(int i = 0; i < 2; i ++){

NIOClientImpl nioClient = new NIOClientImpl(selector, i, 18088);

Thread t = new Thread(() -> {

try {

nioClient.setLock(lock);

nioClient.start();

}catch (IOException e){

e.printStackTrace();

}

});

t.start();

}

}

}

class NIOClientImpl{

ByteBuffer writeBuffer = ByteBuffer.allocate(4096);

ByteBuffer readBuffer = ByteBuffer.allocate(4096);

Selector selector;

private int id = 0;

private int port;

private ReentrantLock lock;

public NIOClientImpl(Selector selector, int id ,int port){

this.selector = selector;

this.id = id;

this.port = port;

}

public void setLock(ReentrantLock lock){

this.lock = lock;

}

public int getId(){

return this.id;

}

public void start() throws IOException {

if(lock == null){

System.out.println("No lock configured, quit.");

return;

}

try {

SocketChannel sc = SocketChannel.open();

sc.configureBlocking(false);

sc.connect(new InetSocketAddress("127.0.0.1", port));

sc.register(selector, SelectionKey.OP_CONNECT);

Scanner scanner = new Scanner(System.in);

while (true) {

lock.lock();

selector.select();

Set<selectionkey> keys = selector.selectedKeys();

Iterator</selectionkey><selectionkey> keyIterator = keys.iterator();

while (keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

keyIterator.remove();

if (key.isConnectable()) {

sc.finishConnect();//可连接状态则去完成连接

sc.register(selector, SelectionKey.OP_WRITE);//连接完改为写入模式

System.out.println("Server connected to client" +id +"...");

break;

} else if (key.isWritable()) {//如果是可写入模式

System.out.println("Please input (id:" + id+") message:");

String message = scanner.nextLine();//等待控制台输入, 由于只有一个 NIOClient, 此处也会产生标准的输入阻塞.

writeBuffer.clear();//清空等待输入

writeBuffer.put(message.getBytes());//放入buffer

writeBuffer.flip();//buffer 由写转读, 如上面所描述

sc.write(writeBuffer);//写入channel

sc.register(selector, SelectionKey.OP_READ);//注册改为读模式

} else if (key.isReadable()){//如果是可读模式

System.out.println("Message (id:" + id + ") received: ");

SocketChannel client = (SocketChannel) key.channel();

readBuffer.clear();//清空读buffer

int num = client.read(readBuffer);//读到readBuffer中

System.out.println(new String(readBuffer.array(),0, num));

sc.register(selector, SelectionKey.OP_WRITE);//注册改为写模式

}

}

lock.unlock();

Thread.sleep(3000);//确保锁可以释放

}

}catch (Exception e){

e.printStackTrace();

}finally {

lock.unlock();

}

}

}

</selectionkey>

NIOServer.java

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.*;

/**

* @author rancho.ws

* @date 2018/7/12

*/

public class NIOServer {

private Selector selector;

private ByteBuffer readBuffer = ByteBuffer.allocate(4096);

private ByteBuffer sendBuffer = ByteBuffer.allocate(4096);

String str;

public void start() throws IOException {

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.configureBlocking(false);

ssc.bind(new InetSocketAddress("localhost", 18088));

selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT);

while (!Thread.currentThread().isInterrupted()) {

selector.select();

Set<selectionkey> keys = selector.selectedKeys();

Iterator</selectionkey><selectionkey> keyIterator = keys.iterator();

while (keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if (!key.isValid()) {

continue;

}

if (key.isAcceptable()) {

accept(key);

} else if (key.isReadable()) {

read(key);

} else if (key.isWritable()) {

write(key);

}

keyIterator.remove();

}

}

}

private void write(SelectionKey key) throws IOException, ClosedChannelException {

SocketChannel channel = (SocketChannel) key.channel();

System.out.println("Write: "+str);

sendBuffer.clear();

sendBuffer.put(str.getBytes());

sendBuffer.flip();

channel.write(sendBuffer);

channel.register(selector, SelectionKey.OP_READ);

}

private void read(SelectionKey key) throws IOException {

SocketChannel socketChannel = (SocketChannel) key.channel();

this.readBuffer.clear();

int numRead;

try {

numRead = socketChannel.read(this.readBuffer);

} catch (IOException e) {

key.cancel();

socketChannel.close();

return;

}

str = new String(readBuffer.array(), 0, numRead);

System.out.println(str);

socketChannel.register(selector, SelectionKey.OP_WRITE);

}

private void accept(SelectionKey key) throws IOException {

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

SocketChannel clientChannel = ssc.accept();

clientChannel.configureBlocking(false);

clientChannel.register(selector, SelectionKey.OP_READ);

System.out.println("A new client connected, address: "+clientChannel.getRemoteAddress());

}

public static void main(String[] args) throws IOException {

System.out.println("Server started...");

new NIOServer().start();

}

}

</selectionkey>

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.