Apache pulsar 技术系列-- 消息重推的几种方式
导语
Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要通过 MQ 实现消息的重新推送能力,比如超时重推、处理异常时重推等,本文介绍 Apache Pulsar 提供的几种消息重推方案。
在 MQ 实际的使用中,用户消费数据时,可能会遇到消息处理异常或者需要推迟处理的场景,这里就涉及到消息的重推逻辑,Pulsar 自己提供了消息重推的能力。本文主要介绍 Pulsar 的消息重推机制。
消息获取(拉取/推送)机制
Pulsar 的消费采用了推、拉结合的消息获取机制,Consumer 获取消息之前会首先通知 Broker(FLOW 请求),Broker 会根据配置的 ReceiveQueue 大小以及 Consumer 当前可以接收的消息数量来推送消息给 Consumer。
详细的交互流程如下图所示:
-
Consumer 在创建之后,会以 MaxReceiveQueue 的大小作为 Permit 值,这个值就是 Consumer 可以缓存的的最大消息条数。
-
然后,Consumer 向 Broker 发起 FLOW 请求,携带 Permit 信息(Consumer Permit 减少到 0),Broker 接收之后会记录这个 Permit 作为 Consumer 的 AvailablePermit,AvailablePermit 决定 Broker 可以向 Consumer 发送数据的数量(实际是在读取数据时判断)。
-
如果 AvailablePermit > 0, Broker 开始读取数据(假设有 N 条),然后推送给 Consumer,推送之后,AvailablePermit 自减 N。
-
Consumer 接收到消息之后,并不会直接返回给用户,而是放在 ReceiveQueue 中,当用户调用 Receive() 方法来获取消息时,Consumer 将 Permit + 1。
-
当 Permit > MaxReceiveQueueSize / 2,Consumer 会再次发起 Flow 请求,并且携带当前的 Permit 值。
上述流程,就是 Consumer 和 Broker 的消息传递过程。
在默认的情况下,数据推送给 Consumer 之后,就完全交给用户处理,数据不会重复推送。这种方式满足不了需要重推的场景,下面介绍目前 Pulsar 的几种重推机制。
SDK 统一的重推
一个比较直观的做法是超过一定时间,如果消息没有 Ack 就重新推送。
目前 Pulsar 提供了通过超时时间来控制数据重推的能力,Consumer 可以配置 AckTimeout(默认关闭),在设置了 AckTimeout 之后,Client 会构建一个 UnAckedMessageTracker ,用户 Receive() 的所有的消息都会被 UnAckedMessageTracker 跟踪。用户 Ack 消息时,会从 UnAckedMessageTracker 删除,对于没有 Ack 的消息,UnAckedMessageTracker 会有定时任务来检查,如果已经超过了 AckTimeout 时间,则会触发重推。
重推是通过 RedeliverUnackMessage 来实现的,UnAckedMessageTracker 会主动发起 Redeliver 的请求,Broker 会根据请求的 MessageId 信息重新推送。
AckTimeout 在 Consumer 初始化时设置:
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .ackTimeout(10, TimeUnit.SECOND)
用户决策的重推 -- NegativeAck
通过 AckTimeout 实现的重推,是 SDK 内部统一实现的,用户不能控制重推的行为,如果用户希望根据自己的使用场景,决定哪些消息需要重推,Pulsar 提供了 NegativeAck 的能力。
NegativeAck 和 AckTimeout 方式类似,有一个 NegativeAcksTracker 来管理消息的重推,NegativeAcksTracker 只会跟踪用户主动调用 NegativeAcknowledge() 方法的 MessageID,重推的逻辑也是通过 RedeliverUnackMessage 实现。
NegativeAck 可以设置 Redelivery 的 Delay 时间。
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
使用的时候,需要明确调用。
// call the API to send negative acknowledgmentconsumer.negativeAcknowledge(message);
用户决策的重推 -- RLQ
除了 NegativeAck 的方式,用户还可以通过重试队列( RLQ )来实现主动的消息重推,RLQ 一般会使用在用户暂时不能处理某些消息,并且希望之后再处理的场景。
Pulsar 提供了 ReconsumeLater() 方法来实现重试队列,和 Negative 不同的是,RLQ 会创建一个新的 Topic,Topic 的格式是 TopicName-SubscriptionName_RLQ , 每次 ReconsumeLater() 时,都会产生一个新的消息写入到 RLQ Topic 中,并且会对之前的消息 Ack。
设置了 RLQ 的 Consumer,SDK 内部默认会启动 RLQ 的订阅,所以 RLQ 的消息也会被 Consumer 消费到。
RLQ 是通过 DeadLetterPolicy 来配置的(DLQ 下文会解释)。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .enableRetry(true) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .build()) .subscribe();
RLQ Topic 中的消息属性中会添加一下信息:
Special property | Description |
---|---|
REAL_TOPIC | 原始 Topic 名称 |
ORIGIN_MESSAGE_ID | 原始 MessageId |
RECONSUMETIMES | 重复消费的次数 |
DELAY_TIME | 投递的延迟时间 |
RLQ 也需要主动调用: consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS)。
为重推次数加上限制--DLQ
对于数据持续处理失败,一直重试并不是一个很好的策略,此时死信队列(DLQ)就是一个比较好的选择,DLQ 允许用户将持续处理失败的数据写入到一个独立的 Dead Letter Topic 中,DLQ 的数据需要单独的订阅来消费。
DLQ Topic 的格式为 TopicName-SubscriptionName_DLQ。DLQ 需要为重试设置一个上限,当重试次数超过上限之后,就会被写入到 DLQ Topic 中。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .build()) .subscribe();
几种重推和 DLQ 的关系
如果配置了 DLQ,那么使用 AckTimeout、NegativeAck 或者 ReconsumeLater 引起的数据重推都会触发 DLQ,也就是说重试的次数达到上限之后,都会被写入到 DLQ topic 里。
重试次数的统计有所区别:
AckTimeout 和 NegativeAck 都是通过 Redelivery 机制来计数的,SDK 发起 Redelivery 请求之后,Broker 侧的 RedeliveryTracker 会记录重推的次数,并且在推送给 Consumer 的 Message 中会包含 RedeliveryCount 的字段。
对于 RLQ,则是从 RECONSUMETIMES 属性中获取重复消费的次数,这个属性在 Client 生成,并且也是在 Client 计数。
总的来说,Apache Pulsar 提供了多种消息重推的方式,用户可以结合自己的场景,灵活使用,满足自己的业务需求。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MySQL5.7 与 MariaDB10.1 审计插件兼容性验证
这是一篇关于发现 MariaDB 审计插件导致 MySQL 发生 crash 后,展开适配验证并进行故障处理的文章。 作者:官永强 爱可生DBA 团队成员,擅长 MySQL 运维方面的技能。热爱学习新知识,亦是个爱打游戏的宅男。 本文来源:原创投稿 爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。 背景 在使用 CentOS Linux release 7.5.1804 (Core) 虚机为 MySQL5.7.34 安装 MariaDB 审计插件时发现:当使用通过解压 mariadb-10.1.48-linux-glibc_ 214-x86_64.tar.gz 获得的 server_audit.so 时,MySQL 会出现 Crash 的情况,通过手动重启 MySQL 也会马上发生 Crash。由此不禁思考: 其他版本的审计插件对该版本MySQL是否也有兼容性问题? 其他版本的MySQL是否也无法使用该版本的审计插件? 对于这样的情况是否有合适的解决方法? 通过查阅官网信息获得 MySQL 5.7 与 MariaDB 10.1 版本审计插件是适配的,于是这里...
- 下一篇
得物词分发平台技术架构建设与演进
前言 在文章开始前先介绍下导购,导购通常是指帮助消费者在购物过程中做出最佳决策的人或系统。在电商网站中,导购可以引导用户关注热卖商品或促销活动等,帮助用户更好地进行购物。导购的目的是为了提高用户的购物体验,促进销售额的增长。 接着开始正文,词分发也属于导购的一部分,他主要提供词推荐相关的功能,比如下拉词、底纹词、榜单、锦囊词、风向标等。而词分发平台则致力于构建一个通用的词推荐平台,避免重复开发和维护成本,提高运营效率和业务灵活性,从而支持公司各个域的业务发展和用户需求。 这个平台的主要作用是集成各种算法和工具,并提供一些通用的服务和接口,让各个域可以快速开 展业务,并实现自己的词推荐功能。这样一来,就不用每个域都进行单独的开发和维护,节省了很多时间和成本。同时,这个平台的灵活性也非常高,因为它可以根据不同的业务需求和场景特点快速地调整和修改,保证业务的顺利推进。 初期 背景 在当今数字时代,搜索引擎已经成为人们生活和工作中不可或缺的工具,通过搜索指定关键词,获取有用的信息和资料已经成为人们的共同需求。因此,在搜索引擎优化过程中,关键词的选择和排名就显得尤为关键。 在常规的搜索引擎中,搜...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果