两个例子带你入门 Disruptor
Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列。很多知名开源项目里,比如 canal 、log4j2、 storm 都是用了 Disruptor 以提升系统性能 。
这篇文章,我们通过两个例子一步一个脚印帮助同学们入门 Disruptor 。
1 环形缓冲区
下图展示了 Disruptor 的流程图 。
和线程池机制非常类似, Disruptor 也是非常典型的生产者/消费者模式。线程池存储提交任务的容器是阻塞队列,而 Disruptor 使用的是环形缓冲区 RingBuffer。
环形缓冲区的设计相比阻塞队列有如下优点:
- 环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
- 元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式,不用担心 index 溢出的问题。index 是 long 类型,即使100万QPS的处理速度,也需要30万年才能用完。
- 无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
2 写一个Hello world
我们写一个非常简单的例子:生产者传递一个单一的长整型值给消费者,而消费者将简单地打印出这个值。
2.1 添加依赖
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.6</version> </dependency>
2.2 定义事件
首先,我们将定义一个事件(Event
),它将携带数据,并且在接下来的所有示例中都是通用的。
public class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; } }
为了让 Disruptor 为我们预分配这些事件,我们需要一个 EventFactory
来执行构造。这可以是一个方法引用,比如 LongEvent::new
,或者是 EventFactory 接口的显式实现:
public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
2.3 定义消费者
定义了事件,我们需要创建一个消费者来处理这些事件。我们会创建一个事件处理器(EventHandler
),它会将把值打印到控制台上。
public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception { System.out.println("currentThread:" + Thread.currentThread().getName() + " Event: " + longEvent); } }
2.4 发布
public class LongEventMain { public static void main(String[] args) throws Exception { int bufferSize = 2; Disruptor<LongEvent> disruptor = new Disruptor<>( new LongEventFactory(), bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(new LongEventHandler()); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
整个发布流程分为四个部分:
-
指定环形缓冲区的大小,必须是2的幂次方,例子中设置的值是 1024 ;
-
构建 Disruptor ,参数分别是
事件工厂EventFactory
、环形缓冲区的大小ringBufferSize
、处理器线程池
、生产者类型
(单生产者/多生产者)、消费者阻塞策略
; -
定义
事件处理器eventHandler
,我们这里的逻辑是打印数据打印在控制台; -
启动 Disruptor,从 Disruptor 中获取
环形缓冲区ringBuffer
,在 for 循环里 ,调用环形队列的publishEvent
方法。这里使用了 ByteBuffer 做为数据的存储容器 , 方便作为参数传递。
我们来看下执行结果 :
3 日志处理
3.1 应用场景
上面的例子比较简单,但假如要应用到生产环境,就显得非常粗糙。
我们模拟一个日志处理的场景,用户进入视频播放页面,浏览器定时的发送浏览日志到服务端,服务端将日志存储起来。
3.2 核心类设计
我们定义一个 DisruptorManager 管理器 , 管理器包含三个核心参数:消费者监听器 DataEventListener
、消费者数量
、环形队列长度
。
public class DisruptorManager<T> { private static final Integer DEFAULT_CONSUMER_SIZE = 4; public static final Integer DEFAULT_SIZE = 4096 << 1 << 1; private DataEventListener<T> dataEventListener; private DisruptorProducer<T> producer; private int ringBufferSize; private int consumerSize; public DisruptorManager(DataEventListener<T> dataEventListener) { this(dataEventListener, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE); } public DisruptorManager(DataEventListener<T> dataEventListener, final int consumerSize, final int ringBufferSize) { this.dataEventListener = dataEventListener; this.ringBufferSize = ringBufferSize; this.consumerSize = consumerSize; } public void start() { EventFactory<DataEvent<T>> eventFactory = new DisruptorEventFactory<>(); Disruptor<DataEvent<T>> disruptor = new Disruptor<>( eventFactory, ringBufferSize, DisruptorThreadFactory.create("consumer-thread", false), ProducerType.MULTI, new BlockingWaitStrategy() ); DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize]; for (int i = 0; i < consumerSize; i++) { consumers[i] = new DisruptorConsumer<>(dataEventListener); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer(); this.producer = new DisruptorProducer<>(ringBuffer, disruptor); } public DisruptorProducer getProducer() { return this.producer; } }
首先和 Hello world 代码中的不同的点,Disruptor 的构造函数中我们自定义了消费者的处理器线程。
DisruptorThreadFactory.create("consumer-thread", false),
然后我们定义消费者的业务逻辑 :
DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize]; for (int i = 0; i < consumerSize; i++) { consumers[i] = new DisruptorConsumer<>(dataEventListener); } disruptor.handleEventsWithWorkerPool(consumers);
消费者本质上是workHandler
的实现类,只不过初始化时将 DataEventListener 作为构造函数的参数。
public class DisruptorConsumer<T> implements WorkHandler<DataEvent<T>> { private DataEventListener<T> dataEventListener; public DisruptorConsumer(DataEventListener dataEventListener) { this.dataEventListener = dataEventListener; } @Override public void onEvent(DataEvent<T> dataEvent) throws Exception { if (dataEvent != null) { dataEventListener.processDataEvent(dataEvent); } } }
因为我们是希望线程池并行的处理这些消息数据,使用的是disruptor.handleEventsWithWorkerPool
可以保证每个事件只会由一个工作处理器处理。
在 springboot 项目中,我们需要初始化相关 bean。
@Configuration @AutoConfigureBefore(RedisConfig.class) public class DisruptorConfig { private final static Logger logger = LoggerFactory.getLogger(DisruptorConfig.class); private final static String LIST_KEY = "disruptor:list"; @Autowired private RedissonClient redissonClient; @Bean public DataEventListener<String> createConsumerListener() { DataEventListener<String> dataEventListener = new DataEventListener<String>() { @Override public void processDataEvent(DataEvent<String> dataEvent) throws InterruptedException { logger.info("processDateEvent data:" + dataEvent.getData()); redissonClient.getList(LIST_KEY).add(dataEvent.getData()); } }; return dataEventListener; } @Bean public DisruptorProducer<String> createProducer(DataEventListener dataEventListener) { DisruptorManager disruptorManage = new DisruptorManager(dataEventListener, 8, 1024 * 1024); disruptorManage.start(); return disruptorManage.getProducer(); }
首先,我们定义好消费者的事件监听器,然后定义 DisruptorProducer
, 该类用来将数据提交到环形队列。
public class DisruptorProducer<T> { private final Logger logger = LoggerFactory.getLogger(DisruptorProducer.class); private final RingBuffer<DataEvent<T>> ringBuffer; private final Disruptor<DataEvent<T>> disruptor; private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t); public DisruptorProducer(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor) { this.ringBuffer = ringBuffer; this.disruptor = disruptor; } /** * Send a data. * * @param data the data */ public void onData(final T data) { try { ringBuffer.publishEvent(translatorOneArg, data); } catch (Exception ex) { logger.error("publish event error:", ex); } } public void shutdown() { if (null != disruptor) { disruptor.shutdown(); } } }
最后,在控制器中,接收前端请求:
@Autowired private DisruptorProducer<String> producer; @GetMapping("/pushlog") public ResponseEntity pushlog(String log) { producer.onData(log); return ResponseEntity.successResult(null); }
从下图中,我们可以看到从控制器接收到请求后,消费处理器线程不断地将数据打印出来,并且发送到队列中。
4 写到最后
日志处理的例子里,我们试图封装 Disruptor 相关 API ,以便在 springboot 项目中更方便的使用。
笔者在测试过程时,发现若消费逻辑慢的时候,生产者发送数据事件时,可能会阻塞。
为什么生产者会阻塞,Disruptor 的核心原理是什么 ,如何使用 Disruptor 的高级特性顺序依赖执行 ?
正因为有这些疑问,笔者觉得深入理解 Disruptor 原理特别有必要,笔者也会在接下来的文章里一一为大家答疑解惑。
参考资料:

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
马斯克考虑向所有推特用户收费
据外媒报道,当地时间周一,在与以色列总理本雅明·内塔尼亚胡(Benjamin Netanyahu)就人工智能展开的广泛对话中,马斯克提出了向X/Twitter的所有用户收费的想法,以解决社交媒体机器人的问题。 马斯克表示,将开始每月向X/Twitter用户收取少量费用。他提到X有5.5亿月活跃用户,每天在该社交网络上发布1亿至2亿条帖子。 马斯克没有提到向X/Twitter用户收费的时间,也没有说费用是多少。他声称,这是消除机器人问题的唯一方法。 马斯克去年10月27日以440亿美元的价格收购Twitter。收购推特后,一些广告客户因对马斯克的内容审核计划感到担忧纷纷撤出,导致推特的广告收入大幅下降。 自2022年10月收购Twitter以来,马斯克一直在对该平台进行改革,比如,将员工人数缩减约80%,推出三层API收费制度以及每月8美元的Twitter Blue付费认证服务。 此外,马斯克还为该公司任命了新的首席执行官琳达·亚卡里诺(Linda Yaccarino),试图赢回广告商。 今年7月1日,马斯克宣布将对用户每天可以浏览的推文数量进行临时限制。他声称,这是为了解决极端的数据抓...
- 下一篇
PAI BladeLLM推理引擎: 超长上下文、更高性能
BladeLLM是阿里云PAI平台提供的大模型推理引擎,致力于让用户轻松部署高性能、低成本的大语言模型服务。BladeLLM对LLM推理和服务的全链路进行了深度的性能优化和工程优化,确保不同模型在不同设备上都达到最优性价比。 除了在常规上下文长度下的极致性能优化之外,BladeLLM还突破了现有LLM推理系统上下文长度的极限,能够支持更长的输入长度以及文本生成长度等,使得LLM能够解锁更多的应用场景,并且BladeLLM在超长上下文下依然保持极致的性能,相比于其他LLM推理服务系统有显著的性能优势。 本文主要介绍BladeLLM在超长上下文方面具有的优势,包括支持的最大上下文长度以及超长上下文的推理性能。 背景 超长上下文是LLM发展的必然趋势 超长上下文推理能力是LLM涌现的重要能力之一,该能力促生了一系列具有巨大潜在价值的应用场景,包括个性化的聊天机器人(Character.AI)、文学创作工具(Jasper)、文章摘要工具(ChatPaper)等。个性化的聊天机器人会和用户进行持续性的交互,给予用户工作、情感、学习等多方面的帮助。LLM会在交流过程中记忆完整的聊天内容,模型输入长...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Hadoop3单机部署,实现最简伪集群
- CentOS6,7,8上安装Nginx,支持https2.0的开启