您现在的位置是:首页 > 文章详情

探秘Netty2:Netty与Java NIO

日期:2018-06-19点击:405

Netty与Java NIO的渊源

netty是什么

Netty是一个基于Java NIO的client-server网络服务框架,人们可以利用netty快速地开发网络应用。同时netty相对于其他网络框架更加简单并且扩展性更强,这主要得益于其提供的简单易用的api将业务逻辑和网络处理代码解耦开来。能够使你更加专注于业务的实现而不需要太多关心网络底层实现。

异步设计

netty所有的api都是异步的。异步处理已经不是什么新鲜事了,众所周知,IO已经变为一个应用的瓶颈,而异步处理正是为了解决这个问题出现的。

CallBacks机制

CallBacks机制经常应用于异步处理,人们可以指定方法执行完后的回调函数,在JavaScript中,回调机制是其语言的核心。下面代码展示了如何利用回调机制处理接受数据。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 
public  interface Fetcher {
void fetchData(FetchCallback callback);
}
public  interface FetchCallback {
void onData(Data data);
void onError(Throwable cause);
}
public  class Worker {
public void doWork() {
Fetcher fetcher = ...
fetcher.fetchData( new FetchCallback() {
@Override
public void onData(Data data) { #1
System.out.println( "Data received: " + data);
}
@Override
public void onError(Throwable cause) { #2
System.err.println( "An error accour: " + cause.getMessage());
}
});
... }
}

#1 没有出现错误,调用onData

#2 出现错误信息,调用onError

你可以将回调函数从当前线程移植到其他线程,但是并不能保证回调函数被执行。当你将多个异步回调函数串起来的时候会形成spaghetti code(管式代码),有些人认为这样的代码很难读,但JavaScript以及Node.js都是这种风格。

Futures机制

异步处理使用的第二个机制是Future机制。一个Future对象只有在特定情况下才会有值,Future对象要么是调用者的返回结果,要么是一个异常。Java在java.util.concurrent包中提供了供其线程池机制使用的Future接口,例如当你使用ExecutorService.submit()提交一个Runable任务时,就可以返回一个Future对象,利用Future对象可以判断该任务是否完成。如下所示:

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 
ExecutorService executor = Executors.newCachedThreadPool();
Runnable task1 =  new Runnable() {
@Override
public void run() {
doSomeHeavyWork();
}
... }
Callable<Interger> task2 =  new Callable() {
@Override
public Integer call() {
return doSomeHeavyWorkWithResul();
}
... }
Future<?> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
while (!future1.isDone() || !future2.isDone()) {
...
// do something else
...
}

CallBacks和Future是异步处理中最常用的两种机制,实际上无法判断两种机制的优劣,而Netty则会两种都提供,你可以自由选择使用哪种机制。

JVM中的阻塞与非阻塞比较

随着web应用的持续增长,如何提升网络应用的效率变得尤为重要。幸运的是从1.4版本开始,java提供了NIO API来供我们编写更有效率的网络应用。Java 7中又引入的NIO.2不仅仅是之前api的升级,同时也允许我们更加高效方便地编写异步代码。

New or non-blocking?

The N in NIO is typically thought to mean non-blocking rather than new.NIO has beenaround for so long now that nobody calls it new IO anymore. Most people refer to it as non-blocking IO

阻塞IO

阻塞IO

上图所示为典型的阻塞IO模式,一个线程处理一个网络连接,因此应用能够处理连接的个数是由JVM上允许建立的线程个数决定的。

非阻塞IO

非阻塞IO


再来看下非阻塞IO模式,上图运用selector机制来处理多个连接。下面通过一个回显服务器示例来讲解非阻塞及阻塞IO的区别。

阻塞IO

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 
public  class PlainEchoServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); #1
try {
while ( true) {
final Socket clientSocket = socket.accept(); #2
System.out.println( "Accepted connection from " +
clientSocket);
new Thread(new Runnable() { #3
@Override
public void run() {
try {
BufferedReader reader =  new BufferedReader(
new
InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer =  new PrintWriter(clientSocket
.getOutputStream(),  true);
while (true) { #4
writer.println(reader.readLine());
writer.flush();
}
catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
catch (IOException ex) {
// ignore on close
}
}
}
}).start(); #5
}
catch (IOException e) {
e.printStackTrace();
}
}
}

# 1 绑定监听端口

# 2 阻塞至有新连接进来

# 3 新建线程用来处理客户端连接

# 4 从客户端读取数据并回写

# 5 启动线程

上述服务器代码要求每次连接进来一个请求就需要创建一个新的线程,即使使用线程池也仅能解决一时问题,不能再根本上解决问题:客户端的连接数取决于后台处理线程的个数。当连接数多时则会带来大问题。

非阻塞IO

在介绍NIO之前,我们先了解一些NIO的基本知识

BYTEBUFFER

ByteBuffer在Netty中即为重要,其主要是用来缓存数据的。ByteBuffer既可以分配到堆内存中也可以分配到堆外内存。一般来说,堆外内存能够更加快速地传递给channel,但分配和释放会更耗时。新旧的NIO API对ByteBuffer提供了统一的管理。ByteBuffer能够实现无拷贝地在各个实例之间共享,同时允许对可见数据进行切片和其他操作处理。

Slicing

Slicing a ByteBuffer allows to create a new ByteBuffer that share the same data as the intialByteBuffer but only expose a sub-region of it. This is useful to minimize memory copies whilestill only allow access to a part of the data

ByteBuffer有以下几个重要的操作

  • 将数据写进ByteBuffer
  • 调用ByteBuffer.flip()切换到读模式
  • 从ByteBuffer中读取数据
  • 调用ByteBuffer.clear()或者ByteBuffer.compact()来整理ByteBuffer内存

当往ByteBuffer中写数据时,ByteBuffer会通过更新buffer中write index的位置来跟踪buffer中的数据(也可以手动更新)。当需要从ByteBuffer中读取数据时,需要调用flip()来切换到读模式,flip()会将buffer的读起始位置设置为0,这样就可以读取buffer中所有数据了。

为了能够再次向ByteBuffer中写数据,可以将buffer模式切换到写模式并调用任意下列两个方法。

  • ByteBuffer.clear():清除ByteBuffer
  • ByteBuffer.compact():通过内存拷贝清除已经读过的数据

ByteBuffer.compact()会将所有未读的数据拷贝到buffer的起始位置。如下所示为ByteBuffer的使用示例

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
 
Channel inChannel = ....;
ByteBuffer buf = ByteBuffer.allocate( 48);
int bytesRead = - 1;
do {
bytesRead = inChannel.read(buf); #1
if (bytesRead != - 1) {
buf.flip(); #2
while(buf.hasRemaining()){
System.out.print((char) buf.get()); #3
}
buf.clear(); #4
}
while (bytesRead != - 1);
inChannel.close();

#1 从channel中读取数据到ByteBuffer

#2 切换模式至读模式

#3 读取buffer中的数据,每次调用一个get()会将buffer当前位置更新+1

#4 切换buffer至写模式,使其可以重新写

使用Selector模式

Selector可以监听多个IO是否可以读/写,这样一个Selector就可以用来处理多个连接,相比于阻塞IO每个连接占用一个线程,Selector模式更加高效。

通过以下几个操作就可以轻松运用Selector

  1. 在channels上创建一个或多个Selector
  2. 在channel上注册需要监听的事件,目前支持四种事件
    • OP_ACCEPT:socket-accept事件
    • OP_CONNECT:socket-connect事件
    • OP_READ:可读事件
    • OP_WRITE:可写事件
  3. channel注册后,调用Selector.select()方法阻塞直到上述注册的一个事件发生
  4. 当Selector.select()返回时,可以通过SelectionKey实例获取所有可操作的事件

下面EchoServer是基于非阻塞Selector的服务器代码,运用这个版本的Server可以运用一个线程处理上千个连接。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 
public  class PlainNioEchoServer {
public void serve(int port) throws IOException {
System.out.println( "Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address =  new InetSocketAddress(port);
ss.bind(address); #1
serverChannel.configureBlocking( false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT); #2
while ( true) {
try {
selector.select(); #3
catch (IOException ex) {
ex.printStackTrace();
// handle in a proper way
break;
}
Set readyKeys = selector.selectedKeys(); #4
Iterator iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove(); #5
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept(); #6
System.out.println( "Accepted connection from" + client);
client.configureBlocking( false);
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100)); #7
}
if (key.isReadable()) { #8
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output); #9
}
if (key.isWritable()) { #10
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output); #11
output.compact();
}
catch (IOException ex) {
key.cancel();
try {
key.channel().close();
catch (IOException cex) {
}
}
}
}
}
}

#1 绑定Server的port

#2 注册channel的OP_ACCEPT Selector事件,监听新连接

#3 阻塞直到有新的连接事件到来

#4 获取所有可操作的SelectedKey实例

#5 遍历SelectedKey实例,将遍历过的去除

#6 获取新的连接

#7 将新的连接注册到Selector中,并监听读/写事件

#8 检查SelectKey是否可读

#9 读数据

#10 检测是否可写

#11 写数据

上述代码实现起来比较繁琐,新的NIO API去掉了大部分繁琐的过程,使实现起来更加简单明了

基于NIO.2的EchoServer

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
 
public  class PlainNio2EchoServer {
public void serve(int port) throws IOException {
System.out.println( "Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open();
InetSocketAddress address =  new InetSocketAddress(port);
serverChannel.bind(address); #1
final CountDownLatch latch =  new CountDownLatch( 1);
serverChannel.accept( nullnew
CompletionHandler<AsynchronousSocketChannel, Object>() { #2
@Override
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
serverChannel.accept(null, this); #3
ByteBuffer buffer = ByteBuffer.allocate( 100);
channel.read(buffer, buffer,
new EchoCompletionHandler(channel)); #4
@Override
public void failed (Throwable throwable, Object attachment){
try {
serverChannel.close(); #5
catch (IOException e) {
// ingnore on close
finally {
latch.countDown();
}
}
});  try
{
latch.await();
catch(
InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
private  final  class EchoCompletionHandler implements
CompletionHandler<IntegerByteBuffer{
private  final AsynchronousSocketChannel channel;
EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
channel.write(buffer, buffer, new CompletionHandler<Integer, #6
ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this); #7
else {
buffer.compact();
channel.read(buffer, buffer,
EchoCompletionHandler.this); #8
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
catch (IOException e) {
// ingnore on close
}
}
}
}

#1 绑定Server的port

#2 监听新连接到来,一旦有新的连接接入则会调用CompletionHandler

#3 重新监听连接接入事件

#4 在channel上触发读操作,一单有数据可读EchoCompletionHandler将会被触发

#5 出现错误时关闭channel

#6 注册写回调事件,通#4

#7 当buffer中还有数据时再次注册写事件

#8 同#4,注册CompletionHandler回调读事件

上述代码看起来要比之前的更加复杂,按NIO2.0自己实现了loop事件,我们在使用的时候只需要简单地注册自己感兴趣的事件即可。



JavaNIO1.0到2.0的发展

旧瓶装新酒,Java NIO

在上一篇文章中对于I/O模型已经讲的比较清楚了,在I/O密集型应用中使用Reactor模式可以明显提高系统的性能(我们这里谈到的性能很大程度上指的是吞吐量),但是在具体的开发过程中模式还是要落地成真实的代码,使用传统的I/O库肯定是不行的,在Java中需要使用java.nio包下的库。

虽然是讲NIO的实现,但本文将不会把所有Java NIO中的主要API全部过一遍,而是通过例子理清NIO到底可以做什么事情。

本文中提到的JDK源代码都可以在%JAVA_HOME%/jre/lib/rt.jar中看到。

Java NIO最初在Java4中被引入,但是到今天还是有很大部分的开发者从来没使用过NIO的API,因为基础I/O已经能满足了我们日常的开发需求。但如果要开发I/O密集型应用的场景下,NIO可以明显的提升程序的性能,另外NIO与基础I/O有本质思想上的区别。
本文主要讲Java中的NIO,内容包含:

  1. Oracle官方对NIO的说法
  2. Java中NIO的历史进程
  3. NIO和NIO.2的区别在哪里
  4. NIO中的主要类的介绍
  5. 使用NIO的API构建一个Socket服务器

Oracle官方对NIO的说法

首先看看Oracle的官方文档中是怎么说的:

Java中对于I/O的支持主要包括java.io 和java.nio两个包的内容,它们共同提供了如下特性:
1. 通过数据流和序列化从文件系统中读取和写数据。
1. 提供Charsets,解码器和编码器,用于在字节和Unicode字符之间的翻译。
1. 访问文件、文件的属性、文件系统。
1. 提供异步的或者非阻塞多路复用I/O的API,用于构建可扩展的服务器程序。

这里并没有提到网络I/O的东西,在Java1.4以前,网络I/O的API都是被放在java.net包下,在NIO中才被一起放入了java.nio包下。

Java中NIO的历史进程

  1. 最开始Java中使用I/O来访问文件系统只有通过java.io.File类来做,其中包含了一些对文件和目录基本的操作。对于开发中常碰到的I/O需求一般都能覆盖到,所以这也是日常开发工作中最常使用的I/O API。官方文档中称之为基础I/O(Basic I/O)。
    基础I/O是基于各种流的概念的,其基本模型就是上一篇中讲到的阻塞I/O。
  2. 为了进一步丰富I/O操作的API,也是为了提升在I/O密集型应用中的性能,基于Reactor模式,在Java1.4中引入了java.nio包,其中重点包含几个类:
    • java.nio.Buffer,用来存储各种缓冲数据的容器。
    • java.nio.channels.Channel,用于连接程序和I/O设备的数据通道。
    • java.nio.channels.Selector,多路复用选择器,在上一篇中讲到过。
    • java.nio.charset.Charset,用来编解码。
  3. 在Java7中引入了NIO.2,引入了一系列新的API(主要在新加入的包Java.nio.file),对于访问文件系统提供了更多的API实现,更加丰富的文件属性类,增加了一些异步I/O的API。同时,还添加了很多实用方法。

    例如:以前简单的拷贝一个文件就必须要写一大堆的代码,现在实用java.nio.file.Files.copy(Path, Path, CopyOption...)就可以很轻松的做到了

NIO和NIO.2的区别在哪里

在上一节中已经简单介绍了这两个概念的不同,这里再简单罗列一下。NIO中引入的一个重要概念就是Reactor模式,而NIO.2对NIO本身不是一次升级,而是一次扩充,NIO.2中新增了很多实用方法(utilities),以支持更多的功能需求,并不是说能够提升多少的性能。主要增加了如下两点:
1. 新的访问文件的API。
访问文件从简单到复杂的方法
Java.nio.file包和其子包中新增了大量的与访问文件相关的类,其中比较重要的有以下几个,更完整的更新可以在Oracle的官网文档中查看。
– java.nio.file.Path,它可以用来取代早期的java.io.File用来访问文件。
– java.nio.file.Files,其中包含了大量的对文件操作的API。

  1. 异步I/O的API
    在NIO原来的API的基础上,增加了对Proactor模式的支持,可以在包java.nio.channels中看到新加入的java.nio.channels.AsynchronousChanneljava.nio.channels.CompletionHandler<V, A>。使用这些类可以实现异步编程,如代码1中所示:
    //代码1 //定义一个处理文件内容的函数式接口 @FunctionalInterface static interface ProcessBuffer{ void process(int result, ByteBuffer bb); } //递归地读取文件的全部内容 static void readFileThrough(AsynchronousFileChannel ch, ProcessBuffer runn, int position) { ByteBuffer bb = ByteBuffer.allocate(512); ch.read(bb, position, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { System.out.println("成功了"); bb.flip(); runn.process(result, bb); bb.clear(); if (result == bb.capacity()) readFileThrough(ch, runn, position + result); } @Override public void failed(Throwable exc, Object attachment) { System.err.println("失败了!!!"); } }); } //读取文件内容,并打印 static void testAIOReadFile() throws IOException { Path p = Paths.get(fileDir, fileName); AsynchronousFileChannel channel = AsynchronousFileChannel.open(p, StandardOpenOption.READ); Thread daemon = new Thread(() -> { try { System.out.println("守护"); Thread.sleep(10000); } catch (Exception e) { } }); readFileThrough(channel, (result, bb) -> { if (result < bb.capacity()) { System.out.println(new String(Arrays.copyOf(bb.array(), result))); System.out.println("已读完。。。"); daemon.interrupt(); }else { System.out.print(new String(bb.array())); } }, 0); daemon.start(); } 

NIO中的主要类的介绍

NIO的基本思想是要构建一个Reactor模式的实现,具体落实到API,在Java中主要有以下几个类:

1. java.nio.Buffer

这是一个容器类,用来存储「基础数据类型」,所有从Channel中读取出来的数据都要使用Buffer的子类来作为存储单元,可以把它想象成一个带着很多属性的数组(和ArrayList很类似,其实它的实现机制也差不多就是这样)。

第一次看到介绍Buffer是在一本书上,书上画了好多方框和指向这些方框的属性值,看着就头晕。其实很简单,Buffer就是一个数组。

在读写交换时,必不可少的要批量地去读取并写入到目标对象,这个道理是不变的。在基础I/O中如果我们要把一个输入流写入一个输出流,可能会这么做:

//代码2 public static void copy(File src, File dest) throws IOException { FileInputStream in = new FileInputStream(src); FileOutputStream out = new FileOutputStream(dest); byte[] buffer = new byte[1024]; int bytes = 0; while ((bytes = in.read(buffer)) > -1){ out.write(buffer, 0, bytes); } out.close(); in.close(); } 

以上代码中使用了一个真实的数组用来做读写切换,从而达到批量(缓冲)读写的目标。
而在NIO中(如代码1),读写切换也同样是使用了一个数组进行暂存(缓冲),只不过在这个数组之上,封装了一些属性(java.nio.Buffer源码中的一些属性如代码3所示)和操作。

//代码3 - Buffer类中定义的一些属性 // Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; 

关于Buffer类详细的继承关系和其主要方法,可以参考下图:
Buffer的继承关系

2. java.nio.channels.Channel

Channel可以看做是代码2中InputStream和OutStream的合体,在实际使用中,我们往往针对同一个I/O设备同时存在读和写的操作,在基础I/O中我们就需要针对同一个目标对象生成一个输入流和输出流的对象,可是在NIO中就可以只建立一个Channel对象了。

Channel抽象的概念是对于某个I/O设备的「连接」,可以使用这个连接进行一些I/O操作,java.nio.channels.Channel本身是一个接口,只有两个方法,但是在Java的的环境中,往往最简单的接口最烦人,因为它的实现类总是会异常的多。

//代码4 - 去除了所有注释的Channel类 package java.nio.channels; import java.io.IOException; import java.io.Closeable; public interface Channel extends Closeable { public boolean isOpen(); public void close() throws IOException; } 

当然,这是享受多态带来的好处的同时必须承受的。详细的Channel继承和实现关系如下:

Channel的继承和实现关系

3. java.nio.channels.Selector

如果你是使用NIO来做网络I/O,Selector是JavaNIO中最重要的类,正如它的注释里第一句说的,Selector是SelectableChannel的「多路复用器」。

SelectableChannel的实现类

多路复用,这是在上一篇介绍过的概念,在不同的操作系统也有不同的底层实现。用户也可以自己实现自己的Selector(通过类java.nio.channels.spi.SelectorProvider

//代码5 - provider构造方法 public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) //如果设置了属性java.nio.channels.spi.SelectorProvider,则会载入响应的类 return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } } 

如果你不实现自己的SelectorProvidor,在代码5中可以看到JDK会使用类sun.nio.ch.DefaultSelectorProvider来创建,这里会根据你的操作系统的类别不同而选择不同的实现类。openJDK中也有相应的实现,有兴趣的可以去GrepCode查看一下,Mac OS下是使用KQueueSelectorProvider

Selector的使用比较简单,同时要配合SelectionKey使用,它们的继承结构图也比较简单,如下:

Selector继承关系

4. 其他

其他一些类如Charset个人感觉属于实用性很强的类,但是在NIO与基础I/O的比较中就显得不那么重要了。

使用NIO的API构建一个Socket服务器

Java1.4引入的NIO中已经可以实现Reactor模式,在NIO.2中又引入了AIO的API,所以本节将分别使用两种模式来实现一个Socket服务器,这里重点介绍Java中NIO API的使用,至于NIO和基础I/O的性能对比,网上有很多,这里就不再做比较了。

首先定义一些基础类,将从Socket中获取的数据解析成TestRequest对象,然后再找到响应的Handler。看代码:

我这里为了偷懒,将很多基础类和方法定义在了一个类中,这种方法其实十分不可取。

//代码6 /** * 执行计算工作的线程池 */ private static ExecutorService workers = Executors.newFixedThreadPool(10); /** * 解析出来的请求对象 * @author lk * */ public static class TestRequest{ /** * 根据解析到的method来获取响应的Handler */ String method; String args; public static TestRequest parseFromString(String req) { System.out.println("收到请求:" + req); TestRequest request = new TestRequest(); request.method = req.substring(0, 512); request.args = req.substring(512, req.length()); return request; } } /** * 具体的逻辑需要实现此接口 * @author lk * */ public static interface SockerServerHandler { ByteBuffer handle(TestRequest req); } 

主要的逻辑其实就是使用ServerSocketChannel的实例监听本地端口,并且设置其为非阻塞(默认为阻塞模式)。代码7中的parse()函数是一个典型的「使用Buffer读取Channel中数据」的方法,这里为了简(tou)单(lan),默认只读取1024个字节,所以并没有实际去循环读取。

//代码7 private static void useNIO() { Selector dispatcher = null; ServerSocketChannel serverChannel = null; try { dispatcher = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().setReuseAddress(true); serverChannel.socket().bind(LOCAL_8080); //ServerSocketChannel只支持这一种key,因为server端的socket只能去accept serverChannel.register(dispatcher, SelectionKey.OP_ACCEPT); while (dispatcher.select() > 0) { operate(dispatcher); } } catch (Exception e) { e.printStackTrace(); } } /** * 在分发器上循环获取连接事件 * @param dispatcher * @throws IOException */ private static void operate(Selector dispatcher) throws IOException { //Set<SelectionKey> keys = dispatcher.keys(); Set<SelectionKey> keys = dispatcher.selectedKeys(); Iterator<SelectionKey> ki = keys.iterator(); while(ki.hasNext()) { SelectionKey key = ki.next(); ki.remove(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); //针对此socket的IO就是BIO了 final SocketChannel socket = channel.accept(); workers.submit(() -> { try { TestRequest request = TestRequest.parseFromString(parse(socket)); SockerServerHandler handler = (SockerServerHandler) Class.forName(getClassNameForMethod(request.method)).newInstance(); socket.write(handler.handle(request)); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } } } private static String parse(SocketChannel socket) throws IOException { String req = null; try { ByteBuffer buffer = ByteBuffer.allocateDirect(1024); byte[] bytes; int count = 0; if ((count = socket.read(buffer)) >= 0) { buffer.flip(); bytes = new byte[count]; buffer.get(bytes); req = new String(bytes, Charset.forName("utf-8")); buffer.clear(); } } finally { socket.socket().shutdownInput(); } return req; } 

Java的程序有个通病,写出来的程序又臭又长,同样是使用JavaNIO的API实现一个非阻塞的Socket服务器,使用NIO.2中AIO(异步I/O)的API就很简单了,但是却陷入了回调地狱(当然可以通过别的方式避免回调,但是其本质还是一样的)。和上边介绍的Reactor模式相比,简直就是拿核武器比步枪,有点降维攻击的意味了。Reactor中那么复杂的概念和逻辑所实现的功能,使用AIO的API很轻松就搞定了,而且概念比较少,逻辑更清晰。

//代码8 private static void useAIO() { AsynchronousServerSocketChannel server; try { server = AsynchronousServerSocketChannel.open(); server.bind(LOCAL_8080); while (true) { Future<AsynchronousSocketChannel> socketF = server.accept(); try { final AsynchronousSocketChannel socket = socketF.get(); workers.submit(() -> { try { ByteBuffer buffer = ByteBuffer.allocateDirect(1024); socket.read(buffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer count, Object attachment) { byte[] bytes; if (count >= 0) { buffer.flip(); bytes = new byte[count]; buffer.get(bytes); String req = new String(bytes, Charset.forName("utf-8")); TestRequest request = TestRequest.parseFromString(req); try { SockerServerHandler handler = (SockerServerHandler) Class.forName(getClassNameForMethod(request.method)).newInstance(); ByteBuffer bb = handler.handle(request); socket.write(bb, null, null); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } buffer.clear(); } } @Override public void failed(Throwable exc, Object attachment) { // TODO Auto-generated method stub } }); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { } }); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); break; } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } 

最后是测试用的客户端程序,NIO在客户端同样也可以发挥很重要的作用,这里就先略过了,代码9中客户端测试使用的是基础I/O:

//代码9 private volatile static int succ = 0; public static void main(String[] args) throws UnknownHostException, IOException { CountDownLatch latch = new CountDownLatch(100); for (int i = 0; i < 100; i++) { new Thread( () -> { Socket soc; try { soc = new Socket("localhost", 8080); if (soc.isConnected()) { OutputStream out = soc.getOutputStream(); byte[] req = "hello".getBytes("utf-8"); out.write(Arrays.copyOf(req, 1024)); InputStream in = soc.getInputStream(); byte[] resp = new byte[1024]; in.read(resp, 0, 1024); String result = new String(resp, "utf-8"); if (result.equals("haha")) { succ++; } System.out.println(Thread.currentThread().getName() + "收到回复:" + result); out.flush(); out.close(); in.close(); soc.close(); } try { System.out.println(Thread.currentThread().getName() + "去睡觉等待。。。"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } }).start(); latch.countDown(); } Runnable hook = () -> { System.out.println("成功个数:" + succ); }; Runtime.getRuntime().addShutdownHook(new Thread(hook)); } 

总结

原本只是想写一篇Netty在RPC框架中的使用,写着写着就写多了。本文从Java中引入NIO的历史讲起,梳理了Java对NIO支持的具体的API,最后通过一个典型的Socket服务器的例子具体的展示了Java中NIO相关API的使用,将Reactor模式和Proactor模式从理论落地到实际的代码。

由于作者比较懒,贴图全部都是在网上找的(代码大部分是自己写的),如侵删。下一篇将讲到比较火的一个NIO框架Netty的实现与使用。


JDK Epoll空轮询bug


本文主要在应用服务器中对JDK的epoll空转bug的处理基础上做了修补。

bug表现

epoll bug
  • 正常情况下,selector.select()操作是阻塞的,只有被监听的fd有读写操作时,才被唤醒
  • 但是,在这个bug中,没有任何fd有读写请求,但是select()操作依旧被唤醒
  • 很显然,这种情况下,selectedKeys()返回的是个空数组
  • 然后按照逻辑执行到while(true)处,循环执行,导致死循环。

bug原因

JDK bug列表中有两个相关的bug报告:

  1. JDK-6670302 : (se) NIO selector wakes up with 0 selected keys infinitely
  2. JDK-6403933 : (se) Selector doesn't block on Selector.select(timeout) (lnx)

JDK-6403933的bug说出了实质的原因:

This is an issue with poll (and epoll) on Linux. If a file descriptor for a connected socket is polled with a request event mask of 0, and if the connection is abruptly terminated (RST) then the poll wakes up with the POLLHUP (and maybe POLLERR) bit set in the returned event set. The implication of this behaviour is that Selector will wakeup and as the interest set for the SocketChannel is 0 it means there aren't any selected events and the select method returns 0.

具体解释为:在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。

这是与操作系统机制有关系的,JDK虽然仅仅是一个兼容各个操作系统平台的软件,但很遗憾在JDK5和JDK6最初的版本中(严格意义上来将,JDK部分版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个bug最终一直到2013年才最终修复的原因,最终影响力太广。

解决办法

不完善的解决办法

grizzly的commiteer们最先进行修改的,并且通过众多的测试说明这种修改方式大大降低了JDK NIO的问题。

if (SelectionKey != null) { // the key you registered on the temporary selector SelectionKey.cancel(); // cancel the SelectionKey that was registered with the temporary selector // flush the cancelled key temporarySelector.selectNow(); } 

但是,这种修改仍然不是可靠的,一共有两点:

  1. 多个线程中的SelectionKey的key的cancel,很可能和下面的Selector.selectNow同时并发,如果是导致key的cancel后运行很可能没有效果
  2. 与其说第一点使得NIO空转出现的几率大大降低,经过Jetty服务器的测试报告发现,这种重复利用Selector并清空SelectionKey的改法很可能没有任何的效果,

完善的解决办法

最终的终极办法是创建一个新的Selector:

Trash wasted Selector, creates a new one.

各应用具体解决方法

Jetty

Jetty首先定义两了-D参数:

  • JVMBUG_THRESHHOLD

org.mortbay.io.nio.JVMBUG_THRESHHOLD, defaults to 512 and is the number of zero select returns that must be exceeded in a period.

  • threshhold

org.mortbay.io.nio.MONITOR_PERIOD defaults to 1000 and is the period over which the threshhold applies.

第一个参数是select返回值为0的计数,第二个是多长时间,整体意思就是控制在多长时间内,如果Selector.select不断返回0,说明进入了JVM的bug的模式。

做法是:

  • 记录select()返回为0的次数(记做jvmBug次数)
  • 在MONITOR_PERIOD时间范围内,如果jvmBug次数超过JVMBUG_THRESHHOLD,则新创建一个selector
Jetty解决空轮询bug

Netty

思路和Jetty的处理方式几乎是一样的,就是netty讲重建Selector的过程抽取成了一个方法。

long currentTimeNanos = System.nanoTime(); for (;;) { // 1.定时任务截止事时间快到了,中断本次轮询 ... // 2.轮询过程中发现有任务加入,中断本次轮询 ... // 3.阻塞式select操作 selector.select(timeoutMillis); // 4.解决jdk的nio bug long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; ... } 

netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些),
如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector



作者:齐晋
链接:https://www.jianshu.com/p/3ec120ca46b2
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

微信公众号【Java技术江湖】一位阿里 Java 工程师的技术小站。(关注公众号后回复”Java“即可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的Java学习指南、Java程序员面试指南等干货资源)


wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==


原文链接:https://yq.aliyun.com/articles/639900
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章