理解Storm的内部消息缓冲机制
这篇文章是Apache Kafka的作者之一Michael G. Noll写的,他的博客地址在[这里]。
Storm工作进程(Worker processes)的内部消息机制
在以下各章节中,我会交替地使用消息(message)和元组(tuple)两个关键字。
本文中当我提到“内部消息”时,它指的是发生在Storm的单个工作进程内部的消息通信,这类通信只在Storm集群的单台主机(节点)上展开。Storm使用由LMAX Disruptor提供的很多消息队列来完成通信,LMAX Disruptor是一个高性能的线程间消息通信库。
- Storm工作进程内部的通信(同一个Storm工作节点(主机)的线程之间):LMAX Disruptor
- 工作进程之间的通信(通过网络的工作节点(主机)之间):ZeroMQ或者Netty
- 计算拓扑之间的通信:Storm并不原生支持,需要自己实现和维护,可以使用消息系统,比如:Kafka,RabbitMQ和数据库等等。
图片说明
在下一节讨论具体细节前,我们先来看看如下这幅图(参考原图):
详细描述
现在我们对Storm的工作进程内部的消息机制有了一定了解,接下来可以深入讨论细节了。
工作进程(Worker processes)
- “topology.receiver.buffer.size”参数是工作进程中的接收线程批量向执行线程的输入队列发送数据时的缓冲区内消息数的最大值(接收线程从网络读取消息),此参数如果设置过大可能会造成一些问题(心跳线程挂掉然后吞吐率直线下降),默认值是8条消息,设置的值必须是2的幂(N次方),这是为了兼容LMAX Disruptor组件。
// 示例: 通过Java API配置
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // 默认值为8
topology.receiver.buffer.size参数不是配置LMAX Disruptor队列的大小,它是配置的一个ArrayList的长度,这个List用来作为输入消息的缓冲,它不需要被多个线程访问,它仅仅是工作进程所专有的,但是因为这个List的元素最终被用来填充基于Disruptor的队列(执行器输入对列),所以这个参数必须是2的幂,参考backtype.storm.messaging.loader.clj的launch-receive-thread!详细信息。
- 使用“topology.transfer.buffer.size”参数配置的transfer缓冲区中的每一个元素实际上是一个tuple的列表,多个Executor中的消息发送线程将批量地把消息从outgoing发送到多个Executor共享的transfer缓冲区(Executor中包含用户逻辑线程和消息发送线程),transfer缓冲区的大小默认是1024个元素。
// 示例: 通过JavaAPI配置
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // 默认值为1024
执行器(Executors)
- topology.executor.receive.buffer.size,该参数是执行器的输入消息队列的配置参数,队列的每个元素是一个tuple列表,tuple被批量地加入到队列元素中,此参数的默认配置是1024个元素,修改配置时的值必须是2的幂(适配于LMAX Disruptor)。
// 示例: 通过Java API配置
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // 批量加入tuple; 默认值是1024
- topology.executor.send.buffer.size,该参数是执行器输入消息队列的配置参数,这个队列的每个元素是一个单独的tuple,默认值为1024,修改配置时的值必须是2的幂(适配于LMAX Disruptor)。
// 示例: 通过Java API配置
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); // 单独的tuple; 默认值是1024
更进一步
如何配置Storm的内部消息缓冲
如何配置Storm的parallelism(并行)参数(实际上就是配置集群中工作进程数,各个Spout和Bolt的实例数和线程数)
搞明白Storm的拓扑内部在做什么
Sending Metrics From Storm to Graphite
Installing and Running Graphite via RPM and Supervisord
ooyala放在Github上的metrics_storm项目也是值得参考的(但我还没有用过这个工具)。
性能优化的建议
1 | conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); |
2 | conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); |
3 | conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); |
4 | conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); |
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
如何在 CentOS 7 上安装 Elastic Stack
Elasticsearch是基于 Lucene 由 Java 开发的开源搜索引擎。它提供了一个分布式、多租户的全文搜索引擎(LCTT 译注:多租户是指多租户技术,是一种软件架构技术,用来探讨与实现如何在多用户的环境下共用相同的系统或程序组件,并且仍可确保各用户间数据的隔离性。),并带有 HTTP 仪表盘的 Web 界面(Kibana)。数据会被 Elasticsearch 查询、检索,并且使用 JSON 文档方案存储。Elasticsearch 是一个可扩展的搜索引擎,可用于搜索所有类型的文本文档,包括日志文件。Elasticsearch 是 Elastic Stack 的核心,Elastic Stack 也被称为 ELK Stack。 Logstash是用于管理事件和日志的开源工具。它为数据收集提供实时传递途径。 Logstash 将收集您的日志数据,将数据转换为 JSON 文档,并将其存储在 Elasticsearch 中。 Kibana是 Elasticsearch 的开源数据可视化工具。Kibana 提供了一个漂亮的仪表盘 Web 界面。 你可以用它来管理和可视化来自 Elas...
- 下一篇
《Spark 官方文档》机器学习库(MLlib)指南
机器学习库(MLlib)指南 MLlib是Spark的机器学习(ML)库。旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。 MLllib目前分为两个代码包: spark.mllib包含基于RDD的原始算法API。 spark.ml则提供了基于DataFrames高层次的API,可以用来构建机器学习管道。 我们推荐您使用spark.ml,因为基于DataFrames的API更加的通用而且灵活。不过我们也会继续支持spark.mllib包。用户可以放心使用,spark.mllib还会持续地增加新的功能。不过开发者需要注意,如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。 下面的列表列出了两个包的主要功能。 spark.mllib: 数据类型,算法以及工具 Data types(数据类型) Basic statistics(基础统计) summary statistics(摘要统计) correlations...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题