如此狂妄,自称高性能队列的Disruptor有啥来头?
并发框架Disruptor
1. Disruptor概述
1.1 背景
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级),基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注,2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。
有界无锁 高并发队列
1.2 什么是Disruptor
Disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是Disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用Disruptor作为ArrayBlockingQueue的替代者。
官方也对Disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能只有有5~10倍左右的提升。
1.3 为什么使用Disruptor
传统阻塞的队列使用锁保证线程安全,而锁通过操作系统内核上下文切换实现,会暂停线程去等待锁,直到锁释放。
执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态,这种状态会导致高水平的写入争用。
1.3.1 传统队列问题
首先这里说的队列也仅限于Java内部的消息队列
队列 | 有界性 | 锁 | 结构 | 队列类型 |
---|---|---|---|---|
ArrayBlockingQueue | 有界 | 加锁 | 数组 | 阻塞 |
LinkedBlockingQueue | 可选 | 加锁 | 链表 | 阻塞 |
ConcurrentLinkedQueue | 无界 | 无锁 | 链表 | 非阻塞 |
LinkedTransferQueue | 无界 | 无锁 | 链表 | 阻塞 |
PriorityBlockingQueue | 无界 | 加锁 | 堆 | 阻塞 |
DelayQueue | 无界 | 加锁 | 堆 | 阻塞 |
1.3.2 Disruptor应用场景
参考使用到disruptor的一些框架.
1.3.2.1 log4j2
Log4j2异步日志使用到了disruptor, 日志一般是有缓冲区, 满了才写到文件, 增量追加文件结合NIO等应该也比较快, 所以无论是EventHandler还是WorkHandler处理应该延迟比较小的, 写的文件也不多, 所以场景是比较合适的。
1.3.2.2 Jstorm
在流处理中不同线程中数据交换,数据计算可能蛮多内存中计算, 流计算快进快出,disruptor应该不错的选择。
1.3.2.3 百度uid-generator
部分使用Ring buffer和去伪共享等思路缓存已生成的uid, 应该也部分参考了disruptor吧。
1.4 Disruptor 的核心概念
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
1.4.1 Ring Buffer
Disruptor中的数据结构,用于存储生产者生产的数据
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
1.4.2 Sequence
序号,在Disruptor框架中,任何地方都有序号
生产者生产的数据放在RingBuffer中的哪个位置,消费者应该消费哪个位置的数据,RingBuffer中的某个位置的数据是什么,这些都是由这个序号来决定的。这个序号可以简单的理解为一个AtomicLong类型的变量。其使用了padding的方法去消除缓存的伪共享问题。
1.4.3 Sequencer
序号生成器,这个类主要是用来协调生产者的
在生产者生产数据的时候,Sequencer会产生一个可用的序号(Sequence),然后生产者就就知道数据放在环形队列的那个位置了。
Sequencer是Disruptor的真正核心,此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
1.4.4 Sequence Barrier
序号屏障
我们都知道,消费者在消费数据的时候,需要知道消费哪个位置的数据。消费者总不能自己想取哪个数据消费,就取哪个数据消费吧。这个SequencerBarrier起到的就是这样一个“栅栏”般的阻隔作用。你消费者想消费数据,得,我告诉你一个序号(Sequence),你去消费那个位置上的数据。要是没有数据,就好好等着吧
1.4.5 Wait Strategy
Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。
设想一种这样的情景:生产者生产的非常慢,而消费者消费的非常快。那么必然会出现数据不够的情况,这个时候消费者怎么进行等待呢?WaitStrategy就是为了解决问题而诞生的。
1.4.6 Event
从生产者到消费者传递的数据叫做Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
1.4.7 EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
1.4.8 Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
1.5 Disruptor特性
Disruptor其实就像一个队列一样,用于在不同的线程之间迁移数据,但是Disruptor也实现了一些其他队列没有的特性,如:
- 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
- 预分配用于存储事件内容的内存空间;
- 针对极高的性能目标而实现的极度优化和无锁的设计;
2. Disruptor入门
我们使用一个简单的例子来体验一下Disruptor,生产者会传递一个long类型的值到消费者,消费者接受到这个值后会打印出这个值。
2.1 添加依赖
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
2.2 Disruptor API
Disruptor 的 API 十分简单,主要有以下几个步骤
2.2.1 定义事件
首先创建一个
LongEvent
类,这个类将会被放入环形队列中作为消息内容。事件(Event)就是通过 Disruptor 进行交换的数据类型。
public class LongEvent { private long value; public void set(long value) { this.value = value; } public long getValue() { return value; } }
2.2.2 定义事件工厂
为了使用Disruptor的内存预分配event,我们需要定义一个EventFactory
事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。
Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。
一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。
public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
2.2.3 定义事件处理的具体实现
为了让消费者处理这些事件,所以我们这里定义一个事件处理器,负责打印event
通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。
public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { //CommonUtils.accumulation(); System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence); } }
2.2.4 指定等待策略
Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
2.2.5 启动 Disruptor
注意ringBufferSize的大小必须是2的N次方
// 指定事件工厂 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字节大小, 必须是2的N次方 int bufferSize = 1024; //单线程模式,获取额外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith(new LongEventHandler()); //启动disruptor线程 disruptor.start();
2.2.6 使用Translators发布事件
在Disruptor的3.0版本中,由于加入了丰富的Lambda风格的API,可以用来帮组开发人员简化流程。所以在3.0版本后首选使用Event Publisher/Event Translator来发布事件。
public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR = new EventTranslatorOneArg<LongEvent, Long>() { public void translateTo(LongEvent event, long sequence, Long data) { event.set(data); } }; public void onData(Long data) { ringBuffer.publishEvent(TRANSLATOR, data); } }
2.2.7 关闭 Disruptor
disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理
2.3 代码整合
2.3.1 LongEventMain
消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等
ByteBuffer 类字节buffer,用于包装消息。
ProducerType.SINGLE为单线程 ,可以提高性能
public class LongEventMain { public static void main(String[] args) { // 指定事件工厂 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字节大小, 必须是2的N次方 int bufferSize = 1024; //单线程模式,获取额外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith(new LongEventHandler()); //启动disruptor线程 disruptor.start(); // 获取 ring buffer环,用于接取生产者生产的事件 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //为 ring buffer指定事件生产者 LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); //循环遍历 for (int i = 0; i < 100; i++) { //获取一个随机数 long value = (long) ((Math.random() * 1000000) + 1); //发布数据 producer.onData(value); } //停止disruptor线程 disruptor.shutdown(); } }
2.3.2 运行测试
测试结果
consumer:pool-1-thread-1 Event: value=579797,sequence=0 consumer:pool-1-thread-1 Event: value=974942,sequence=1 consumer:pool-1-thread-1 Event: value=978977,sequence=2 consumer:pool-1-thread-1 Event: value=398080,sequence=3 consumer:pool-1-thread-1 Event: value=867251,sequence=4 consumer:pool-1-thread-1 Event: value=796707,sequence=5 consumer:pool-1-thread-1 Event: value=786555,sequence=6 consumer:pool-1-thread-1 Event: value=182193,sequence=7 .....
Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。 本文由
传智教育博学谷
教研团队发布。如果本文对您有帮助,欢迎
关注
和点赞
;如果您有任何建议也可留言评论
或私信
,您的支持是我坚持创作的动力。转载请注明出处!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
通过自动化单元测试的形式守护系统架构
1 背景 随着需求开发迭代,代码库规模逐渐变大,新的团队成员引入等诸多因素,系统起初制定的架构规则不可避免遭到破坏。不仅仅是破坏团队的统一开发规范,更为重要的是随着代码库规模逐渐增长,大大降低系统的可维护性、扩展性,增加评审复杂度和重构成本,也最终导致团队生产力下降以及研发成本增长。 在敏捷开发环境下,系统通过迭代增量的交付价值,系统架构也是如此。团队不可能在项目之初就建立完美的系统架构,系统架构应该随着系统迭代不断演进。 架构演进和架构腐化是看待架构的不同视角:架构腐化着眼于现状,架构演进侧重于未来 架构腐化不可避免,随着时间流转腐化现象必然发生。而我们需要做的是:通过某种方式及早发现和修正 2 为什么选择Archunit 我们需要通过引入一种机制或技术,降低或及早发现架构腐化现象的发生,保持统一的系统架构约束。 支持架构规则自动化检查 轻量级,接入成本低 结果及时反馈 灵活扩展且扩展成本低 对于架构规则常见的验证方式:代码评审、代码质量分析工具或平台、Archunit 以下对常见的几种方式进行优劣势对比: 代码评审:通过强流程控制代码评审活动发生,增强代码评审的强度和质量 优势...
- 下一篇
强烈建议升级 | mica-mqtt 2.0.3 发布
一、简介 mica-mqtt 基于 t-io 实现的简单、低延迟、高性能 的 mqtt 物联网开源组件。mica-mqtt 更加易于集成到已有服务和二次开发,降低自研物联网平台开发成本。 二、功能 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。 支持 websocket mqtt 子协议(支持 mqtt.js)。 支持 http rest api,http api 文档详见。 支持 MQTT client 客户端。 支持 MQTT server 服务端。 支持 MQTT 遗嘱消息。 支持 MQTT 保留消息。 支持自定义消息(mq)处理转发实现集群。 MQTT 客户端 阿里云 mqtt 连接 demo。 支持 GraalVM 编译成本机可执行程序。 支持 jfinal 项目快速接入。 支持 Spring boot 项目快速接入。 mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块。 三、使用场景 物联网(云端 mqtt ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- 设置Eclipse缩进为4个空格,增强代码规范
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS6,7,8上安装Nginx,支持https2.0的开启