如何让消息队列达到最大吞吐量?
你在使用消息队列的时候关注过吞吐量吗?
思考过吞吐量的影响因素吗?
考虑过怎么提高吗?
总结过最佳实践吗?
本文带你一起探讨下消息队列消费端高吞吐的 Go
框架实现。Let’s go!
关于吞吐量的一些思考
-
写入消息队列吞吐量取决于以下两个方面
- 网络带宽
- 消息队列(比如Kafka)写入速度
最佳吞吐量是让其中之一打满,而一般情况下内网带宽都会非常高,不太可能被打满,所以自然就是讲消息队列的写入速度打满,这就就有两个点需要平衡
- 批量写入的消息量大小或者字节数多少
- 延迟多久写入
go-zero 的
PeriodicalExecutor
和ChunkExecutor
就是为了这种情况设计的 -
从消息队列里消费消息的吞吐量取决于以下两个方面
- 消息队列的读取速度,一般情况下消息队列本身的读取速度相比于处理消息的速度都是足够快的
- 处理速度,这个依赖于业务
这里有个核心问题是不能不考虑业务处理速度,而读取过多的消息到内存里,否则可能会引起两个问题:
- 内存占用过高,甚至出现OOM,
pod
也是有memory limit
的 - 停止
pod
时堆积的消息来不及处理而导致消息丢失
解决方案和实现
借用一下 Rob Pike
的一张图,这个跟队列消费异曲同工。左边4个 gopher
从队列里取,右边4个 gopher
接过去处理。比较理想的结果是左边和右边速率基本一致,没有谁浪费,没有谁等待,中间交换处也没有堆积。
我们来看看 go-zero
是怎么实现的:
Producer
端
for { select { case <-q.quit: logx.Info("Quitting producer") return default: if v, ok := q.produceOne(producer); ok { q.channel <- v } } }
没有退出事件就会通过 produceOne
去读取一个消息,成功后写入 channel
。利用 chan
就可以很好的解决读取和消费的衔接问题。
Consumer
端
for { select { case message, ok := <-q.channel: if ok { q.consumeOne(consumer, message) } else { logx.Info("Task channel was closed, quitting consumer...") return } case event := <-eventChan: consumer.OnEvent(event) } }
这里如果拿到消息就去处理,当 ok
为 false
的时候表示 channel
已被关闭,可以退出整个处理循环了。同时我们还在 redis queue
上支持了 pause/resume
,我们原来在社交场景里大量使用这样的队列,可以通知 consumer
暂停和继续。
- 启动
queue
,有了这些我们就可以通过控制producer/consumer
的数量来达到吞吐量的调优了
func (q *Queue) Start() { q.startProducers(q.producerCount) q.startConsumers(q.consumerCount) q.producerRoutineGroup.Wait() close(q.channel) q.consumerRoutineGroup.Wait() }
这里需要注意的是,先要停掉 producer
,再去等 consumer
处理完。
到这里核心控制代码基本就讲完了,其实看起来还是挺简单的,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整实现。
如何使用
基本的使用流程:
- 创建
producer
或consumer
- 启动
queue
- 生产消息 / 消费消息
对应到 queue
中,大致如下:
创建 queue
// 生产者创建工厂 producer := newMockedProducer() // 消费者创建工厂 consumer := newMockedConsumer() // 将生产者以及消费者的创建工厂函数传递给 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
我们看看 NewQueue
需要什么参数:
producer
工厂方法consumer
工厂方法
将 producer & consumer
的工厂函数传递 queue
,由它去负责创建。框架提供了 Producer
和 Consumer
的接口以及工厂方法定义,然后整个流程的控制 queue
实现会自动完成。
生产 message
我们通过自定义一个 mockedProducer
来模拟:
type mockedProducer struct { total int32 count int32 // 使用waitgroup来模拟任务的完成 wait sync.WaitGroup } // 实现 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产消息的逻辑AddListener()
:添加事件listener
消费 message
我们通过自定义一个 mockedConsumer
来模拟:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
启动 queue
启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:
func main() { // 创建 queue q := NewQueue(func() (Producer, error) { return newMockedProducer(), nil }, func() (Consumer, error) { return newMockedConsumer(), nil }) // 启动panic了也可以确保stop被执行以清理资源 defer q.Stop() // 启动 q.Start() }
以上就是 queue
最简易的实现示例。我们通过这个 core/queue
框架实现了基于 redis
和 kafka
等的消息队列服务,在不同业务场景中经过了充分的实践检验。你也可以根据自己的业务实际情况,实现自己的消息队列服务。
整体设计
整体流程如上图:
- 全体的通信都由
channel
进行 Producer
和Consumer
的数量可以设定以匹配不同业务需求Produce
和Consume
具体实现由开发者定义,queue
负责整体流程
总结
本篇文章讲解了如何通过 channel
来平衡从队列中读取和处理消息的速度,以及如何实现一个通用的消息队列处理框架,并通过 mock
示例简单展示了如何基于 core/queue
实现一个消息队列处理服务。你可以通过类似的方式实现一个基于 rocketmq
等的消息队列处理服务。
关于 go-zero
更多的设计和实现文章,可以关注『微服务实践』公众号。
项目地址
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持我们!
微信交流群
关注『微服务实践』公众号并点击 进群 获取社区群二维码。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
部署 完全分布式高可用 Hadoop hdfs HA + yarn HA
部署 完全分布式高可用 Hadoop hdfs HA + yarn HA 标签(空格分隔): 大数据运维专栏 一:hadoop HDFS HA 与 yarn HA 的 概述 二:部署环境概述 三:部署zookeeper 四:部署HDFS HA 与 yarn HA 五:关于 HA 的测试 一:hadoop HDFS HA 与 yarn HA 的 概述 1.1 HA 的概述 HA概述 1)所谓HA(High Available),即高可用(7*24小时不中断服务)。 2)实现高可用最关键的策略是消除单点故障。HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA。 3)Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)。 4)NameNode主要在以下两个方面影响HDFS集群: NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启。 NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用。 HDFS HA功能通过配置Active/Standby两个NameNodes实现在集群中对NameNode的热备来解决上述问...
- 下一篇
R软件SIR模型网络结构扩散过程模拟
与普通的扩散研究不同,网络扩散开始考虑网络结构对于扩散过程的影响。 这里介绍一个使用R模拟网络扩散的例子。 基本的算法非常简单: 生成一个网络:g(V, E)。 随机选择一个或几个节点作为种子(seeds)。 每个感染者以概率p(可视作该节点的传染能力,通常表示为ββ)影响与其相连的节点。 其实这是一个最简单的SI模型在网络中的实现。S表示可感染(susceptible), I表示被感染(infected)。易感态-感染态-恢复态(SIR)模型用以描述水痘和麻疹这类患者能完全康复并获得终身免疫力的流行病。对于SIR流行病传播模型,任意时刻节点只能处于易感态(S)或感染态(I)或恢复态(R)。易感态节点表示未被流行病感染的个体,且可能被感染;感染态节点表示已经被流行病感染且具有传播能力;恢复态节点则表示曾感染流行病且完全康复。与SIS模型类似,每一时间步内,每个感染态节点以概率λλ尝试感染它的邻居易感态节点,并以概率γγ变为恢复态。SIR模型可以表达为: S = S(t)是易感个体的数量, I = I(t)是被感染的个体的数目, R = R(t)是恢复的个体的数目。 第二组因变量代表在三...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池