bboyjing's blog

Java NIO之【Scalable IO in Java】

看完了并发网的NIO教程,是否有种意犹未尽的感觉。正常情况下,答案应该是肯定的。那我们下面来看下Doug Lea大神写的Scalable IO in Java,直接可以下载英文版pdf。这边就当边学习边翻译了。

网络服务

大部分网路服务有着相同的体系:

  • 读取请求(Read request)
  • 对请求进行解码(Decode request)
  • 处理业务逻辑(Process service)
  • 对返回值进行编码(Encode reply)
  • 发送返回值(Send reply)

下面我们来看下传统的设计模型:
nio_7
其中,每一个handler有可能都要新起一个线程去执行。用伪代码模拟如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Servce {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(1234);
while (!Thread.interrupted()) {
new Thread(new Handler(ss.accept())).start();
}
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
byte[] input = new byte[1024];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException e) {
e.printStackTrace();
}
}
private byte[] process(byte[] cmd) {
return null;
}
}
}

从伪代码可以看出传统I/O模型的雏形,需要为每一个接收到的socket连接新建一个线程去执行具体的业务逻辑。

可扩展性的目标

首先,肯定是不满意上面传统的I/O设计模型,才有接下来的讨论。无休止地新建线程去执行具体业务逻辑,最终无疑会拖垮整个系统。当然,也很容易想到,可以用线程池,但是这样虽然可以限制线程数量,但是并发数也因此被限制了,所以并不是解决之道。那我们就来看下可扩展性I/O的目标是什么:

  • 高负载情况下的优雅降级
  • 硬件的升级能持续地给系统带来性能提升
  • 当然也包含可用性和性能的目标:低延迟、高负载等

分治法(Divide and Conquer)

分治法一般是解决可扩展性的最好的途径。将处理流程分成一些小的任务,每一个任务都包含一个非阻塞操作。当任务准备好的时候去执行它。这里,一个I/O事件通常被作为触发器。比如下面:
nio_8
说实话,上面这一话配上这张图,不是很能理解。被分成的小任务是整个handler,还是比如说read这样一个操作。感觉是把handler拆成一个个小任务,再往下学吧,应该会越来越清晰。
java.nio提供如下基本的机制:

  • 非阻塞的读和写
  • 与感兴趣的I/O事件相关联的任务分配机制

事件驱动设计

一系列事件驱动设计使得无限可能。这种方式通常比其他方案更有效,原因如下:

  • 占用资源少:不需要为每个客户端开启一个新线程
  • 开销少:减少上下文切换的开销,减少锁的使用

但是,通常也更难编码,原因如下:

  • 必须拆分成许多小的非阻塞单元,但是无法消除所有的阻塞动作,比如说GC、页错误等
  • 必须持续追踪服务的逻辑状态

Reactor模式

Reactor模式有如下几个特征:

  • Reactor通过调度相应的处理程序来相应I/O事件
  • 处理程序执行非阻塞操作
  • 通过绑定处理程序来管理事件。

我们先来看下单个线程版本的模型图:
nio_9

java.nio中的Channel、Buffer、Selector、SelectionKey类可以支持该模型。上图如果第一眼不能很好地理解的话,先来看下代码,涉及到两个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Reactor implements Runnable{
final Selector selector;
final ServerSocketChannel serverSocketChannel;
Reactor(int port) throws IOException {
// 初始化ServerSocketChannel,以非阻塞模式运行
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 初始化Selector
selector = Selector.open();
// 将ServerSocketChannel注册到Selector上
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 在SelectionKey上绑定一个附属对象Acceptor
selectionKey.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
// 阻塞直至事件就绪
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
// 分发
dispatch((SelectionKey)(it.next()));
}
// 需要自己清除selectedKeys
selected.clear();
}
}catch (IOException ex) {
}
}
void dispatch(SelectionKey k) {
/**
* 获取SelectionKey中的attachment,并执行该attachment的run()方法
* 拿第一个到达的socket连接来看,该attachment为一个Acceptor实例
*/
Runnable r = (Runnable)(k.attachment());
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable{
public void run() {
try {
// 获取新连接的SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
// 具体的处理逻辑
new Handler(selector, socketChannel);
}
} catch (IOException ex) {
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class Handler implements Runnable{
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
socket.configureBlocking(false);
// 继续在Selector上注册读事件,此时attachment为当前Handler实例
sk = socket.register(sel, SelectionKey.OP_READ, this);
// 使选择器上的第一个还没有返回的选择操作立即返回
sel.wakeup();
}
boolean inputIsComplete() {
return true;
}
boolean outputIsComplete() {
return true;
}
void process() {
System.out.println("Handle processing...");
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
System.out.println("Handle reading...");
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// 在SelectionKey上注册写事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
System.out.println("Handle writing...");
socket.write(output);
if (outputIsComplete()) {
//write完就结束了, 关闭select key
sk.cancel();
}
}
}

结合模型图和代码,直观的感受是单个线程可以同时处理多个客户端请求了。下面列举下Reactor模式的一些概念:

  • Reactor:负责响应I/O事件,当检测到一个新的事件,将其发送给相应的Handler去处理
  • Handler:负责处理非阻塞的行为,同时将handler与事件绑定

Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。由于只有单个线程,所以handler中的业务需要能够快速处理完。当然,还能再改进,可以将具体的业务逻辑放到单独的线程池中去跑,这儿就不实现了。同时,NIO暂时也就看到这里,主要是了解下相关知识,为下面学习Netty做个准备。