Reactor模型详解
对于Java IO模型的变化,描述最为清楚的莫属于Doug Lea对Reactor模型的讲解《Scalable IO in Java》。本文则主要围绕该文档,对Java IO模型的演变过程进行讲解,并且会讲解各个模型所解决的问题以及其存在的问题。最后,本文也会以一个实际的例子来实现Reactor模型。
1. 传统IO模型
对于传统IO模型,其主要是一个Server对接N个客户端,在客户端连接之后,为每个客户端都分配一个执行线程。如下图是该模型的一个演示:
从图中可以看出,传统IO的特点在于:
- 每个客户端连接到达之后,服务端会分配一个线程给该客户端,该线程会处理包括读取数据,解码,业务计算,编码,以及发送数据整个过程;
- 同一时刻,服务端的吞吐量与服务器所提供的线程数量是呈线性关系的。
这种设计模式在客户端连接不多,并发量不大的情况下是可以运行得很好的,但是在海量并发的情况下,这种模式就显得力不从心了。这种模式主要存在的问题有如下几点:
- 服务器的并发量对服务端能够创建的线程数有很大的依赖关系,但是服务器线程却是不能无限增长的;
- 服务端每个线程不仅要进行IO读写操作,而且还需要进行业务计算;
- 服务端在获取客户端连接,读取数据,以及写入数据的过程都是阻塞类型的,在网络状况不好的情况下,这将极大的降低服务器每个线程的利用率,从而降低服务器吞吐量。
2. Reactor事件驱动模型
在传统IO模型中,由于线程在等待连接以及进行IO操作时都会阻塞当前线程,这部分损耗是非常大的。因而jdk 1.4中就提供了一套非阻塞IO的API。该API本质上是以事件驱动来处理网络事件的,而Reactor是基于该API提出的一套IO模型。如下是Reactor事件驱动模型的示意图:
从图中可以看出,在Reactor模型中,主要有四个角色:客户端连接,Reactor,Acceptor和Handler。这里Acceptor会不断地接收客户端的连接,然后将接收到的连接交由Reactor进行分发,最后有具体的Handler进行处理。改进后的Reactor模型相对于传统的IO模型主要有如下优点:
- 从模型上来讲,如果仅仅还是只使用一个线程池来处理客户端连接的网络读写,以及业务计算,那么Reactor模型与传统IO模型在效率上并没有什么提升。但是Reactor模型是以事件进行驱动的,其能够将接收客户端连接,网络读和网络写,以及业务计算进行拆分,从而极大的提升处理效率;
- Reactor模型是异步非阻塞模型,工作线程在没有网络事件时可以处理其他的任务,而不用像传统IO那样必须阻塞等待。
3. Reactor模型----业务处理与IO分离
在上面的Reactor模型中,由于网络读写和业务操作都在同一个线程中,在高并发情况下,这里的系统瓶颈主要在两方面:
- 高频率的网络读写事件处理;
- 大量的业务操作处理;
基于上述两个问题,这里在单线程Reactor模型的基础上提出了使用线程池的方式处理业务操作的模型。如下是该模型的示意图:
从图中可以看出,在多线程进行业务操作的模型下,该模式主要具有如下特点:
- 使用一个线程进行客户端连接的接收以及网络读写事件的处理;
- 在接收到客户端连接之后,将该连接交由线程池进行数据的编解码以及业务计算。
这种模式相较于前面的模式性能有了很大的提升,主要在于在进行网络读写的同时,也进行了业务计算,从而大大提升了系统的吞吐量。但是这种模式也有其不足,主要在于:
- 网络读写是一个比较消耗CPU的操作,在高并发的情况下,将会有大量的客户端数据需要进行网络读写,此时一个线程将不足以处理这么多请求。
4. Reactor模型----并发读写
对于使用线程池处理业务操作的模型,由于网络读写在高并发情况下会成为系统的一个瓶颈,因而针对该模型这里提出了一种改进后的模型,即使用线程池进行网络读写,而仅仅只使用一个线程专门接收客户端连接。如下是该模型的示意图:
可以看到,改进后的Reactor模型将Reactor拆分为了mainReactor和subReactor。这里mainReactor主要进行客户端连接的处理,处理完成之后将该连接交由subReactor以处理客户端的网络读写。这里的subReactor则是使用一个线程池来支撑的,其读写能力将会随着线程数的增多而大大增加。对于业务操作,这里也是使用一个线程池,而每个业务请求都只需要进行编解码和业务计算。通过这种方式,服务器的性能将会大大提升,在可见情况下,其基本上可以支持百万连接。
5. Reactor模型示例
对于上述Reactor模型,服务端主要有三个角色:Reactor,Acceptor和Handler。这里基于Doug Lea的文档对其进行了实现,如下是Reactor的实现代码:
public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException {
serverSocket = ServerSocketChannel.open(); // 创建服务端的ServerSocketChannel
serverSocket.configureBlocking(false); // 设置为非阻塞模式
selector = Selector.open(); // 创建一个Selector多路复用器
SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
serverSocket.bind(new InetSocketAddress(port)); // 绑定服务端端口
key.attach(new Acceptor(serverSocket)); // 为服务端Channel绑定一个Acceptor
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(); // 服务端使用一个线程不断等待客户端的连接到达
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
dispatch(iterator.next()); // 监听到客户端连接事件后将其分发给Acceptor
iterator.remove();
}
selector.selectNow();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey key) throws IOException {
// 这里的attachement也即前面为服务端Channel绑定的Acceptor,调用其run()方法进行
// 客户端连接的获取,并且进行分发
Runnable attachment = (Runnable) key.attachment();
attachment.run();
}
}
这里Reactor首先开启了一个ServerSocketChannel,然后将其绑定到指定的端口,并且注册到了一个多路复用器上。接着在一个线程中,其会在多路复用器上等待客户端连接。当有客户端连接到达后,Reactor就会将其派发给一个Acceptor,由该Acceptor专门进行客户端连接的获取。下面我们继续看一下Acceptor的代码:
public class Acceptor implements Runnable {
private final ExecutorService executor = Executors.newFixedThreadPool(20);
private final ServerSocketChannel serverSocket;
public Acceptor(ServerSocketChannel serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
try {
SocketChannel channel = serverSocket.accept(); // 获取客户端连接
if (null != channel) {
executor.execute(new Handler(channel)); // 将客户端连接交由线程池处理
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
这里可以看到,在Acceptor获取到客户端连接之后,其就将其交由线程池进行网络读写了,而这里的主线程只是不断监听客户端连接事件。下面我们看看Handler的具体逻辑:
public class Handler implements Runnable {
private volatile static Selector selector;
private final SocketChannel channel;
private SelectionKey key;
private volatile ByteBuffer input = ByteBuffer.allocate(1024);
private volatile ByteBuffer output = ByteBuffer.allocate(1024);
public Handler(SocketChannel channel) throws IOException {
this.channel = channel;
channel.configureBlocking(false); // 设置客户端连接为非阻塞模式
selector = Selector.open(); // 为客户端创建一个新的多路复用器
key = channel.register(selector, SelectionKey.OP_READ); // 注册客户端Channel的读事件
}
@Override
public void run() {
try {
while (selector.isOpen() && channel.isOpen()) {
Set<SelectionKey> keys = select(); // 等待客户端事件发生
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 如果当前是读事件,则读取数据
if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
// 如果当前是写事件,则写入数据
write(key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 这里处理的主要目的是处理Jdk的一个bug,该bug会导致Selector被意外触发,但是实际上没有任何事件到达,
// 此时的处理方式是新建一个Selector,然后重新将当前Channel注册到该Selector上
private Set<SelectionKey> select() throws IOException {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
if (keys.isEmpty()) {
int interestOps = key.interestOps();
selector = Selector.open();
key = channel.register(selector, interestOps);
return select();
}
return keys;
}
// 读取客户端发送的数据
private void read(SelectionKey key) throws IOException {
channel.read(input);
if (input.position() == 0) {
return;
}
input.flip();
process(); // 对读取的数据进行业务处理
input.clear();
key.interestOps(SelectionKey.OP_WRITE); // 读取完成后监听写入事件
}
private void write(SelectionKey key) throws IOException {
output.flip();
if (channel.isOpen()) {
channel.write(output); // 当有写入事件时,将业务处理的结果写入到客户端Channel中
key.channel();
channel.close();
output.clear();
}
}
// 进行业务处理,并且获取处理结果。本质上,基于Reactor模型,如果这里成为处理瓶颈,
// 则直接将其处理过程放入线程池即可,并且使用一个Future获取处理结果,最后写入客户端Channel
private void process() {
byte[] bytes = new byte[input.remaining()];
input.get(bytes);
String message = new String(bytes, CharsetUtil.UTF_8);
System.out.println("receive message from client: \n" + message);
output.put("hello client".getBytes());
}
}
在Handler中,主要进行的就是为每一个客户端Channel创建一个Selector,并且监听该Channel的网络读写事件。当有事件到达时,进行数据的读写,而业务操作这交由具体的业务线程池处理。
6. 小结
本文首先讲解了Java IO的几种网络模型,着重比较了每种不同的模型的优缺点,并且基于Reactor模型,使用一个实例对其实现过程进行了演示。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
踏入职场后,差距来自哪里
混迹职场多年后,免不了要跟以前的同学或同事聚上一聚,聊一聊职场人生,感悟一下时光匆匆,顺便怀念一下当年。这时候一个尴尬或者拉仇恨的话题可能会摆上桌面「这几年混的怎么样?」,为什么尴尬,混的好的当然不存在,主要是对混的一般没什么成就的人来说,就略显尬尴。同样的年纪,同样的学历,同样的起点,几年后,为何个人能力、贫富差距,眼界格局差距如此之大 ?这些年到底发生了什么,何以带来如此的差距 ?今天就来说一说这个事情。 一、环境 还记得以前的自己在关于环境这个问题上一直认为,如果没打算在一线这种城市扎根,那就不要出去,回来后需要从头积累,而且一线生活成本较高,税后收入刨去生活成本跟在二线其实差不了太多,我相信很多人都听过类似的话。后来我就认为这就是正确的。后来的后来我发现,我错了,搞IT有机会还是得去一线,去大厂。 作为it行业从业人员,平台太重要了,信息也太重要了,有时候不是能力的问题,而是知不知道的问题,知不知道这是眼界和见识决定的,这种平台和机会一线城市显然要优于二三线城市,那二三线也不是完全没机会,只是相对小很多,我们当然应该去搏大概率事件。 拿个技术问题举例来说,「分布式事务」,微服...
-
下一篇
TiKV 源码解析系列文章(三)Prometheus(上)
作者:Breezewish 本文为 TiKV 源码解析系列的第三篇,继续为大家介绍 TiKV 依赖的周边库 rust-prometheus,本篇主要介绍基础知识以及最基本的几个指标的内部工作机制,下篇会介绍一些高级功能的实现原理。 rust-prometheus 是监控系统 Prometheus 的 Rust 客户端库,由 TiKV 团队实现。TiKV 使用 rust-prometheus 收集各种指标(metric)到 Prometheus 中,从而后续能再利用 Grafana 等可视化工具将其展示出来作为仪表盘监控面板。这些监控指标对于了解 TiKV 当前或历史的状态具有非常关键的作用。TiKV 提供了丰富的监控指标数据,并且代码中也到处穿插了监控指标的收集片段,因此了解 rust-prometheus 很有必要。 感兴趣的小伙伴还可以观看我司同学在 FOSDEM 2019 会议上关于 rust-prometheus 的技术分享。 基础知识 指标类别 Prometheus 支持四种指标:Counter、Gauge、Histogram、Summary。rust-prometheus ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- 设置Eclipse缩进为4个空格,增强代码规范
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS8编译安装MySQL8.0.19
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL数据库在高并发下的优化方案
- SpringBoot2更换Tomcat为Jetty,小型站点的福音