个人成长博客

纸上得来终觉浅,绝知此事要躬行

0%

Java IO

概述

IO的方式通常分为几种,同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO。Java中的BIO、NIO和 AIO 可以理解为是 Java 语言对操作系统的各种 IO 模型的封装。程序员在使用这些 API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码,只需要使用Java的API就可以了。

BIO

BIO 就是传统的 java.io 包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。它的优点就是代码比较简单、直观;缺点就是 IO 的效率和扩展性很低,容易成为应用性能瓶颈。Block-IO:InputStream和OutputStream,Reader和Writer。BIO的调用模型如下:

Bio优化演进

Bio是一个阻塞式的io,不能够支持并发请求访问,所以可以使用多线程优化代码。这种方式也存在缺点:如果每个请求过来都使用一个线程,这时候非常浪费CPU的资源。因此可以使用线程池来优化代码实现,线程池还可以让线程的创建和回收成本相对较低,使用FixedThreadPool 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制。采用线程池和任务队列可以实现一种叫做伪异步的 I/O 通信框架。当有新的客户端接入时,将客户端的 Socket 封装成一个Task投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。不过因为它的底层任然是同步阻塞的BIO模型,因此无法从根本上解决并发访问量增加的问题。当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量,所以nio出现了。

BIO示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void server(int port) throws IOException {
//将ServerSocket绑定到指定的端口里
final ServerSocket socket = new ServerSocket(port);
while (true) {
//阻塞直到收到新的客户端连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
//创建一个子线程去处理客户端的请求
new Thread(new Runnable() {
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
//从客户端读取数据并原封不动回写回去
while (true) {
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}

利用线程池优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void threadPoolServer(int port) throws IOException {
//将ServerSocket绑定到指定的端口里
final ServerSocket socket = new ServerSocket(port);
//创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(6);
while (true) {
//阻塞直到收到新的客户端连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
//将请求提交给线程池去执行
executorService.execute(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
//从客户端读取数据并原封不动回写回去
while (true) {
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}

NIO

NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层高性能的数据操作方式。NonBlock-IO:构建多路复用的、同步非阻塞的IO操作。NIO的调用模型如下:

客户端发送的请求,都会注册到多路复用器上。多路复用器上发现一个io连接请求时,才会处理。NIO的特点是程序会不断询问内核,数据是否准备好。

NIO核心组件

  1. 通道(Channel):通常我们nio所有的操作都是通过通道开始的,所有的通道都会注册到统一个选择器(Selector)上实现管理,在通过选择器将数据统一写入到 buffer中
  2. 缓冲区(Buffer):Buffer本质上就是一块内存区,可以用来读取数据,也就先将数据写入到缓冲区中、在统一的写入到硬盘上
  3. 选择器(Selector):Selector可以称做为选择器,也可以把它叫做多路复用器,可以在单线程的情况下可以去维护多个Channel,也可以去维护多个连接

IO多路复用

IO多路复用中,IO实际指的就是网络的IO,多路也就是多个不同的tcp连接,复用也就是指使用同一个线程合并处理多个不同的IO操作,这样的话可以减少CPU资源。(单个线程可以同时处理多个不同的io操作,应用场景非常广泛:redis原理、Mysql连接原理)。

NIO的IO多路复用,主要还是依赖于操作系统。操作系统提供一种机制(poll、select、epoll),允许注册IO请求,当有任何一个请求被触发,会有反馈。poll、select每次都要遍历所有的注册,并且轮询。epoll只会返回对应被触发的注册时间,允许有条件的获取数据,并轮询。

在windows操作系统中使用select实现轮训机制时间复杂度是为 o(n),而且这种情况也会存在空轮训的情况,效率非常低、其次默认对我们的轮训有一定限制,所以这样的话很难支持上万tcp连接。所以在这时候linux操作就出现epoll实现事件驱动回调形式通知,不会存在空轮训的情况,只是对活跃的socket实现主动回调,这样的性能有很大的提升 所以时间复杂度为是o(1)。windows操作系统没有epoll、只有linux操作系统有。poll、select、epoll主要区别如下:

NIO示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public void server(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
//将ServerSocket绑定到指定的端口里
ss.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
//将channel注册到Selector里,并说明让Selector关注的点,这里是关注建立连接这个事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
try {
//阻塞等待就绪的Channel,即没有与客户端建立连接前就一直轮询
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
//代码省略的部分是结合业务,正确处理异常的逻辑
break;
}
//获取到Selector里所有就绪的SelectedKey实例,每将一个channel注册到一个selector就会产生一个SelectedKey
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
//将就绪的SelectedKey从Selector中移除,因为马上就要去处理它,防止重复执行
iterator.remove();
try {
//若SelectedKey处于Acceptable状态
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//接受客户端的连接
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
//像selector注册socketchannel,主要关注读写,并传入一个ByteBuffer实例供读写缓存
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));
}
//若SelectedKey处于可读状态
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
//从channel里读取数据存入到ByteBuffer里面
client.read(output);
}
//若SelectedKey处于可写状态
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
//将ByteBuffer里的数据写入到channel里
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}

AIO

AIO 是 Java 1.7 之后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操作方式,所以人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。Asynchronous IO:基于事件和回调机制。AIO的调用模型如下:

AIO结果处理

  1. 基于回调:实现CompletionHandler接口,调用时触发回调函数
  2. 返回Future:通过isDone()查看是否准备好,通过get()等待返回数据

AIO示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void server(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
// 将ServerSocket绑定到指定的端口里
serverChannel.bind(address);
final CountDownLatch latch = new CountDownLatch(1);
// 开始接收新的客户端请求. 一旦一个客户端请求被接收, CompletionHandler 就会被调用.
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
// 一旦完成处理,再次接收新的客户端请求
serverChannel.accept(null, this);
ByteBuffer buffer = ByteBuffer.allocate(100);
//在channel里植入一个读操作EchoCompletionHandler,一旦buffer有数据写入,EchoCompletionHandler便会被唤醒
channel.read(buffer, buffer, new EchoCompletionHandler(channel));
}

@Override
public void failed(Throwable throwable, Object attachment) {
try {
// 若遇到异常,关闭channel
serverChannel.close();
} catch (IOException e) {
// ingnore on close
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

回调事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;

EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
// 在channel里植入一个读操作CompletionHandler,一旦channel有数据写入,CompletionHandler 便会被唤醒
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
// 如果buffer里还有内容,则再次触发写入操作将buffer里的内容写入channel
channel.write(buffer, buffer, this);
} else {
buffer.compact();
// 如果channel里还有内容需要读入到buffer里,则再次触发写入操作将channel里的内容读入buffer
channel.read(buffer, buffer, EchoCompletionHandler.this);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
}

对比

BIO是一个连接一个线程,NIO是一个请求一个线程,AIO是一个有效请求一个线程

属性\模型 阻塞BIO 非阻塞NIO 异步AIO
blocking 阻塞并同步 非阻塞但同步 非阻塞并异步
线程数(server:clent) 1:1 1:N 0:N
复杂度 简单 较复杂 复杂
吞吐量

总结:

  1. Java BIO :同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善
  2. Java NIO :同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理
  3. Java AIO(NIO.2) :异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理