bboyjing's blog

Java NIO之【通道】

通道(Channel)是java.nio的第二个主要创新。它们既不是一个扩展也不是一个增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel用于在字节缓冲区和位于通道另一侧的实例之间有效地传输数据。通道是一种途径,借助该途径,可以用最小的总开销来访问操作系统本身的I/O服务。缓冲区则是通道内部用来发送和接受数据的端点,也可以说是数据的载体。

通道基础

通道的顶层接口是Channel类,该接口很简单,我们贴出来看看:

1
2
3
4
public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}

从接口可以看出,对所有的通道来说只有两种共同的操作:检查一个通道是否打开和关闭一个通道。有一点要注意的是:通道只能在字节缓冲区上操作。

打开通道

通道是访问I/O服务的导管。I/O可以分为广义的两大类别:File I/O和Stream I/O。所以相应地会有文件(file)通道和套接字(socket)通道。通道可以有多种方式创建,Socket通道有可以直接创建新socket通道的工厂方法,但是一个FileChannel对象却只能通过一个打开的RandomAccessFile、FileInputStream或FileOutputStream的对象上调用getChannel()方法获取,不可以直接创建。下面列举一些打开通道的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 连接到TCP网络套接字的通道
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8888));
// 可以监听新进来的TCP连接的通道
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8888));
// 能收发UDP包的通道
DatagramChannel dc = DatagramChannel.open();
// 连接到文件的通道
RandomAccessFile raf = new RandomAccessFile("somefile", "r");
FileChannel fc = raf.getChannel();

使用通道

我们先来看一个简化的UML类图:
nio_4
通道可以是双向的或者是单向的,取决于实现上图的两个接口。如果只实现了ReadableByteChannel或者WritableByteChannel其中之一,则是单向的,只能在一个方向上传输数据。如果同时实现这两个接口,则是双向的,可以双向传输数据。另外还有一个ByteChannel接口,该接口本身没有定义任何内容,只是实现了ReadableByteChannel和WritableByteChannel接口,作为双向通道方便使用。
通道会连接一个特定的I/O服务,且通道实例的性能受它所连接的I/O服务的特征限制,这一点很重要。比如,一个连接只读文件的Channel实例不能进行写操作,即使该实例所属的类可能有write()方法。
通道可以以阻塞或非阻塞模式运行。非阻塞模式的通道永远不会让调用的线程休眠,请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。只有面向流的通道,如socket和pipes才能使用非阻塞模式。
socket通道类从AbstractSelectableChannel继承而来,继承了AbstractSelectableChannel的类可以和支持有条件选择的选择器一起使用。将非阻塞I/O和选择器组合起来可以实现I/O多路复用,具体后面再看。

关闭通道

与缓冲区不同,通道不能被重复使用。一个打开的通道即代表与一个特定的I/O服务的特定连接并封装该连接的状态。但通道关闭时,连接会丢失,然后通道将不再连接任何东西。调用通道的close()方法时,通道会关闭底层I/O服务,在此过程中可能会导致线程暂时阻塞,哪怕是该通道处于非阻塞模式下。
通道引入了一些与关闭和中断有关的新行为。如果一个通道实现InterruptibleChannel接口,其行为可描述为:如果一个线程在一个通道上被阻塞,并且同时被中断,那么该通道将被关闭,该被阻塞线程也会产生一个ClosedByInterruptException异常。(如果对这个行为有点不理解的话,可能需要强化下线程相关知识,尤其是线程中断机制。)此外,假如一个线程的interrupt status被设置并且该线程视图访问一个通道,那么这个通道将立即被关闭,同时将抛出ClosedByInterruptException异常。仅仅因为休眠在其上的线程被中断就关闭通道,这看起来似乎过于苛刻了,但这是NIO设计者们的经验和智慧的结晶。
可中断的通道也是可以异步关闭的。实现InterruptibleChannel接口的通道可以在任何时候被关闭,即使有另一个被阻塞的线程在等待该通道上的一个I/O操作完成。当一个通道被关闭时,休眠在该通道上的所有线程都将被唤醒,并接收到一个AsynchronousCloseException异常,接着通道就被关闭将不再可用。

Scatter/Gather

通道提供了一种被称为Scatter/Gather的重要新功能(有时也被成为矢量I/O)。Scatter/Gather是一个简单却强大的概念,它是指在多个缓冲区上实现一个简单的I/O操作。对于一个write操作而言,数据是从几个缓冲区按顺序抽取(gather),并沿着通道发送。对于read操作而言,从通道读取的数据会按顺序被散步(scatter)到多个缓冲区,将每个缓冲区填满,直至通道中的数据或者缓冲区的最大空间被消耗完。
大多数现代操作系统都支持本地矢量I/O,当在一个通道上请求Scatter/Gather操作时,该请求会被翻译为适当的本地调用,来直接填充或抽取缓冲区。这是一个很大的进步,因为减少或避免了缓冲区拷贝和系统调用。为了从本地I/O获取最大性能优势,Scatter/Gather应该使用直接缓冲区。我们先来看一段伪代码,知道如何使用该功能,下面我们假定channel连接到一个有48字节数据等待读取的socket上:

1
2
3
4
ByteBuffer header = ByteBuffer.allocate(10);
ByteBuffer body = ByteBuffer.allocate(80);
ByteBuffer[] buffers = {header, body};
int bytesRead = channel.read(buffers);

一但read()方法返回,bytesRead就被赋予值48,hear缓冲区将包含前10个从通道读取的字节,而body缓冲区则包含接下来38个字节。通道会自动地将数据scatter到这两个缓冲区中。

文件通道

文件通道总是阻塞式的,因此不能被置于非阻塞模式。对于文件I/O,最强大之处在于异步I/O,它允许一个进程可以从操作系统请求一个或多个I/O操作,而不必等待这些操作完成。发起请求的进程之后会收到它请求的I/O操作已经完成的通知。
FileChannel对象的创建之前已经说过了,这里就不赘述了。FileChannel对象是线程安全的,可以看下FileChannel的子类FileChannelImpl的源码,方法中基本都使用synchronized包裹。

访问文件

每个FileChannel对象都和一个文件描述符有一对一的关系。FileChannel中的API方法和POSIX兼容的操作系统上的常用文件I/O系统调用紧密对应也就可以理解了。
同底层的文件描述符一样,每个FileChannel都有一个叫做”fiel position”的概念。这个position值决定文件中哪一处的数据接下来将被读或写,从这个方法看,FileChannel类同缓冲区很类似。FileChannel位置是从底层的文件描述符获得的,该position同时被作为通道引用获取来源的文件对象共享。这就意味着一个对象对该position的更新可以被另一个对象看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FileChannelPosition {
public static void main(String[] args) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("README.md", "r");
// 设置position
randomAccessFile.seek(1000);
// 从文件中创建通道
FileChannel fileChannel = randomAccessFile.getChannel();
System.out.println("file pos: " + fileChannel.position());
// 通过randomAccessFile修改position,并打印fileChannel指向的position
randomAccessFile.seek(500);
System.out.println("file pos: " + fileChannel.position());
// 通过fileChannel修改position,并打印randomAccessFile指向的position
fileChannel.position(200);
System.out.println ("file pos: " + randomAccessFile.getFilePointer());
}
}

类似于缓冲区的get()和put()方法,当字节被read()或write()方法传输时,文件position会自动更新。如果position值达到了文件大小的值,read()方法会返回-1。可是,不同于缓冲区的是,如果执行write()方法时,position会前进到超过文件大小的值,该文件会扩展以容纳新写入的字节。
当需要减少一个文件的size时,truncate()方法会砍掉您指定的新size值之外的所有数据,同时文件的position会被设置为所提供的新size的值。
force()方法告诉通道强制将全部待定的修改都落盘。因为所有的现代文件系统都会缓存数据和延迟磁盘文件更新以提高性能,调用force()方法可以要求文件的所有待定修改立即同步到磁盘。如果文件位于一个本地文件系统,一旦force()方法返回,即可保证落盘成功。但如果文件位于一个远程的文件系统上,那么不能提供可靠性保证。

文件锁定

文件锁定分为两种:共享所和独占锁,从名字上基本可以看出这两种锁的区别。有关FileChannel实现的文件锁定模型的一个重要注意项是:锁的对象是文件而不是通道或线程,也就是说文件锁是进程级别的锁。文件锁旨在在进程级别上判优文件访问,如果需要多个Java线程并发访问,需要自己去实现锁定方案。这种情况下,内存映射文件可能是一个合适的选择。关于内存映射文件,后面再说。
关于文件锁定的API就两类:lock()和tryLock()。要获得一个共享锁,必须先以只读权限打开文件,而请求独占所时则需要写权限。tryLock()方法是lock()方法的非阻塞辩题,和lock()方法起相同作用,区别是,如果请求的锁不能立刻获取到,则会返回一个null。
一个FileLock对象创建之后即有效,直到它的release()方法被调用,或它所关联的通道被关闭,或Java虚拟机关闭时才会失效。

内存映射文件

本书翻译的确实有点晦涩,所以找了一个比较容易理解的解释,来了解什么是内存映射文件。

内存映射文件是一种允许Java程序直接从内存访问的特殊文件。通过将整个文件或者文件的一部分映射到内存中、操作系统负责获取页面请求和写入文件,应用程序就只需要处理内存数据,这样可以实现非常快速的IO操作。用于内存映射文件的内存在Java的堆空间以外。Java中的java.nio包支持内存映射文件,可以使用MappedByteBuffer来读写内存。而MappedByteBuffe实例是通过FileChannel的map()方法生成的。

总结一下,当系统对I/O性能要求高,并且要操作的文件比较大时,可以考虑使用内存映射文件。当然,真的要使用时,还需要针对性地做些研究。

Socket通道

新的socket通道类可以运行非阻塞模式,并且是可选择的。因此再也没有必要为每个socket连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换的开销了。借助新的NIO类,一个或多个线程就可以管理成百上千的活动socket连接了,并且只有很少甚至没有性能损失。下面先看下简化的UML类图:
nio_5
其中DatagramChannel和SocketChannel实现定义读和写功能的接口,而ServerSocketChannel没有实现。ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身不传输数据。这三个socket通道类在被实例化时都会创建一个对等的socket对象,这些对象是java.net包中的类(Socket、ServerSocket和DatagramSocket),它们已经被更新以识别通道。对等的socket对象可以通过调用socket()方法从一个通道上获取。

非阻塞模式

要把一个socket通道置于非阻塞模式,我们要依靠所有socket通道类的公用抽象类AbstractSelectableChannel。通过调用configureBlocking(booelan block)去设置socket通道的模式。参数为true时,为阻塞模式,否则为非阻塞模式。

ServerSocketChannel

ServerSocketChannel是一个基于通道的socket监听器,它和java.net.ServerSocket执行相同的基本任务,不过增加了通道语义,因此能够在阻塞模式下运行。
用静态的open()工厂方法创建一个新的ServerSocketChannel实例,将会返回和一个java.net.ServerSocket关联的通道。该对等的ServerSocket可以通过在返回的ServerSocketChannel实例上调用socket()方法获取。由于ServerSocketChannel没有bind()方法,所以需要取出对等的socket,并使用它来绑定到一个端口以开始监听连接。但是JDK1.7开始,ServerSocketChannel已经有了bind()方法。实例代码如下:

1
2
3
4
5
6
7
// 创建ServerSocketChannel实例
ServerSocketChannel ssc = ServerSocketChannel.open();
// 取出对等的ServerSocket实例
ServerSocket serverSocket = ssc.socket();
// 通过ServerSocket绑定端口
serverSocket.bind(new InetSocketAddress(1234));
// ssc.bind(new InetSocketAddress(1234));

和ServerSocket一样,ServerSocketChannel也有accept()方法。一旦创建了一个ServerSocketChannel,并且对等的ServerSocket绑定了端口,然后就可以在其中一个上调用accept()方法了。如果选择在ServerSocket上调用,则总是阻塞并返回一个java.net.Socket对象,如果选择在ServerSocketChannel上调用,则会返回SocketChannel类型的对象,返回的对象能够在非阻塞模式下运行。如果以非阻塞模式被调用,当没有传入连接在等待时,erverSocketChannel.accept()会立即返回null,可选择性也因此得到了实现。我们下面来看一个小例子,功能是使用一个选择器实例来注册一个ServerSocketChannel对象,以实现新连接到达时自动通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ChannelAccept {
public static final String GREETING = "Hello I must be going.\r\n";
public static void main (String [] args) throws IOException, InterruptedException {
// 定义缓冲区
ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes( ));
// 初始化ServerSocketChannel,以非阻塞模式运行
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(1234));
ssc.configureBlocking(false);
while (true) {
System.out.println ("Waiting for connections");
// 对每一个新进来的连接都会创建一个SocketChannel
SocketChannel sc = ssc.accept( );
if (sc == null) {
// 没有连接到达时
Thread.sleep (2000);
} else {
System.out.println ("Incoming connection from: "
+ sc.socket().getRemoteSocketAddress( ));
// rewind()方法与flip()相似,但是不会影响limit字段
buffer.rewind( );
sc.write (buffer);
sc.close( );
}
}
}
}

运行上述程序,由于一直没有socket连接过来,所以会每隔2秒输出Waiting for connections。

SocketChannel

Socket和SocketChannel类封装点对点、有序的网络连接,类似TCP/IP网络连接。SocketChannel扮演客户端发起同一个监听服务的器的连接。直到连接成功,它才能收到数据。每个SocketChannel对象创建时都是同一个对等的java.net.Socket对象串联的。通过静态的open()方法,可以创建一个新的SocketChannel对象,然后在该对象上调用socket()方法可以返回对等的Socket对象。新创建的 SocketChannel 虽已打开却是未连接的。同样有两种方式连接到ServerSocketChannel:

  1. 通过在对等的Socket对象上调用connect()方法,那么线程在连接建立好,或者超时之前都将保持阻塞。
  2. 通过在SocketChannel直接调用connect()方法,如果通道处于阻塞模式(默认模式),那么连接过程与上面一样。但如果通道处于非阻塞模式下,它发起对请求地址的连接,并且立即返回值。如果返回值是true,说明连接立即建立了;如果连接不能立即建立,会返回false,并且并发地继续连接建立过程。

下面给一段管理异步连接的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ConnectAsync {
public static void main (String [] argv) throws Exception {
// 初始化SocketChannel,并将默认的阻塞模式改为非阻塞
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
System.out.println("initiating connection");
// 连接服务端
sc.connect(new InetSocketAddress ("localhost", 1234));
// 判断是否完成连接
while (!sc.finishConnect()) {
System.out.println ("doing something...");
}
System.out.println("connection established");
}
}

上例中,我们在while循环中判断连接是否完成,后面会了解到如何使用选择器来避免进行轮询并在异步连接建立之后收到通知。
另外,Socket通道是线程安全的,浏览下SocketChannelImpl源码,其中关键方法接都被synchronized包裹。

DatagramChannel

和上面两个socket通道一样,每一个DatagramChannel对象也有一个关联的DatagramSocket对象。SocketChannel是面向TCP/IP协议的,而DatagramChannel则是面向UDP/IP协议的。创建DatagramChannel的模式和创建其他socket通道一样。新的DatagramChannel会有一个可以通过调用socket()方法获取对等的DatagramSocket对象。DatagramChannel既可以充当服务器(监听者),也可以充当客户端(发送者)。
DatagramChannel是无连接的,可以发送单独的数据包给不同的目的地址,同样也可以接收来自任意地址的数据包。每个到达的数据包都含有关于它来自何处的信息。
数据包的实际发送或接收是通过send()和receive()方法来实现的。

  • receive()方法将下次将要传入的数据复制到预备好的ByteBuffer中,并返回一个SocektAddress对象,以指出数据来源。如果通道处于阻塞模式,receive()可能无限期地休眠,直到有包到达。如果是非阻塞模式,当没有可接收的包时,会返回null。如果包内的数据超出缓冲区能承受的范围,多出的数据都会被悄悄地丢弃。
  • send()方法会发送给定的ByteBuffer对象的内容到给定的SocketAddress对象所描述的目的地地址和宽口,内容范围为从当前的position开始到末尾处结束。如果DatagramChannel处于阻塞模式,调用线程可能会休眠至数据被加入传输队列。如果通道是非阻塞的,返回值要么是字节缓冲区的字节数,要么是’0’。发送数据包是一个全有或全无的行为,如果传输队列没有足够的空间来承载整个数据包,那么什么内容都不会发送。

要注意的是,UDP/IP协议不可靠性是固有的,它不对数据传输做保证。send()方法返回的非零值并不表示数据包达到了目的地,仅代表数据包被成功加到本地网络层的传输队列。
上面说过DatagramChannel是无连接的,但是它也有一个connect()方法,其连接语义不同于SocketChannel。将DatagramChannel置于已连接状态,可以使除了它所“连接”到的地址外的任何其他源地址数据包都被忽略。已连接通道会发挥作用的一个场景之一是一个客户端/服务端模式、使用UDP通讯协议的实时游戏。每个客户端都只和同一台服务器进行会话而希望忽视任何其他来源地数据包。当DatagramChannel处于连接状态时,可以调用read()或write()方法。另外,不同于流socket,数据包socket的无状态性质不需要和远程系统进行对话来建立连接状态。所以并没有实际的连接,只有用来指定允许的远程地址的本地状态信息。
下面列举一些使用DatagramChannel而非SocketChannel的理由:

  • 程序可以承受数据丢失或无序的数据
  • 只负责发包,而不用管包是否被接收
  • 数据吞吐量比可靠性更重要
  • 需要同时发送数据给多个接受者

管道

java.nio.channels包含一个名为Pipe的类。广义上讲,管道就是一个用来在两个实体之间单向传输数据的导管。在Unix系统中,管道被用来连接一个进程的输出和另一个进程的输入。Pipe类实现一个管道范例,不过它所创建的管道是进程内的,而非进程间使用的。
Pipe实例是通过调用不带参数的Pipe.open()工厂方法创建的。Pipe类定义了两个嵌套的通道类来实习那管路。这两个类是Pipe.SourceChannel(管道负责读的一端)和Pipe.SinkChannel(管道负责写的一端)。这两个通道实例是在Pipe对象创建的时候同时被创建的,可以通过在Pipe对象上分别调用source()和sink()方法取得。

结束

看完这本书的选择器一章,可能是本人理解能力不行,基本上晕晕乎乎的。于是不再以此书为媒介,准备换一条途径,在这里推荐下并发网的NIO教程。由于并发网的教程本身就是博客了,下面就不转载了,想继续学下去话就直接移步到他们家吧。至于这里整理的缓冲区和通道这两章,个人觉得还是很有必要的。