消息队列的消费语义和投递语义
引言
所谓的消费语义,指的就是如下三种情况
- 如何保证消息最多消费一次
- 如何保证消息至少消费一次
- 如何保证消息恰好消费一次
其实类似还有一个投递语义
- 如何保证消息最多投递一次
- 如何保证消息至少投递一次
- 如何保证消息恰好投递一次
说句实在话,其实还是老问题,只是换了一种问法!
OK,开始我们的正文
正文
我们先做如下约定
Producer
代表生产者Consumer
代表消费者Message Queue
代表消息队列
投递语义
我们先从投递语义开始讲起,因为要先把这个概念讲明白了,才能讲消费语义。恰巧,kafka
实现了这三种语义,我们以kafka
来说明。
如何保证消息最多投递一次?
简单,就是我已经投出去了,收没收到不管了,会存在消息丢失。
我们在初始化Producer
时可以通过配置request.required.acks
不同的值,来实现不同的发送模式。
这里将request.required.acks
设为0,意思就是Producer
不等待Leader确认,只管发出即可;最可能丢失消息。如果丢了消息,就是投递0次。如果没丢,就是投递1次。符合最多投递一次的含义。
如何保证消息至少投递一次?
这里将request.required.acks
设为-1。Producer
往kafka
的Leader(主)
节点发送消息后,会等follower(从)
节点同步完数据以后,再给Producer
返回ACK确认消息。
但是这里是有几率出现重复消费的问题的。
例如,kafka
保存消息后,发送ACK前宕机,Producer
认为消息未发送成功并重试,造成数据重复!
那么,在这种情况下,就会出现大于1次的投递情况,符合至少投递一次的含义。
如何保证消息恰好投递一次?
kafka
在0.11.0.0版本之后支持恰好投递一次的语义。
我们将enable.idempotence
设置为ture,此时就会默认把request.required.acks
设为-1,可以达到恰好投递一次的语义。
如何做到的?
为了实现Producer
的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。
kafka
为每个Producer
分配一个pid,作为该Producer
的唯一标识。
Producer
会为每一个<topic,partition>维护一个单调递增的seq。
类似的,Message Queue
也会为每个<pid,topic,partition>记录下最新的seq。
当req_seq == message_seq+1时,Message Queue
才会接受该消息。因为:
- (1)消息的seq比
Message Queue
的seq大一以上,说明中间有数据还没写入,即乱序了。 - (2)消息的seq比
Message Queue
的seq小,那么说明该消息已被保存。
消费语义
这里我们还是做一个定义如下所示
consumer.poll()
表示消费者获取消息内容processMsg(message)
表示下游系统进行消费消息consumer.commit()
表示消费者往消息队列提交确认信息,消息队列接到确认消息,删除该消息。
注意了,我是以processMsg
函数,即处理消息的过程,定义为消费消息。
如何保证消息最多消费一次?
Producer
:满足最多投递一次的语义即可,即只管发消息,不需要等待消息队列返回确认消息。
Message Queue
:接到消息后往内存中一放就行,不用持久化存储。
Consumer
:拉取到消息以后,直接给消息队列返回确认消息即可。至于后续消费消息成功与否,无所谓的。即按照以下顺序执行
consumer.poll(); consumer.commit(); processMsg(message);
如何保证消息至少消费一次?
Producer
:满足至少投递一次语义即可,即发送消息后,需要等待消息队列返回确认消息。如果超时没收到确认消息,则重发。
Message Queue
:接到消息后,进行持久化存储,而后返回生产者确认消息。
Consumer
:拉取到消息后,进行消费,消费成功后,再返回确认消息。即按照如下顺序执行
consumer.poll(); processMsg(message); consumer.commit();
由于这里Producer
满足的是至少投递一次语义,因此消息队列中是有重复消息的。所以我们的Consumer
会出现重复消费的情形!
如何保证消息恰好消费一次?
在保证至少消费一次的基础上,processMsg
满足幂等性操作即可。
如何保证幂等性操作?
老问题了,比如有状态的消息啊。比如唯一表啊。大家搜一搜,一大堆答案,不想重复说了。
总结
本文讲的是消息队列的消费语义和投递语义的含义,希望大家有所收获。
出处:https://www.cnblogs.com/rjzheng/p/11050673.html
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
重磅开源|AOP for Flutter开发利器——AspectD
作者:闲鱼技术-正物 https://github.com/alibaba-flutter/aspectd 问题背景 随着Flutter这一框架的快速发展,有越来越多的业务开始使用Flutter来重构或新建其产品。但在我们的实践过程中发现,一方面Flutter开发效率高,性能优异,跨平台表现好,另一方面Flutter也面临着插件,基础能力,底层框架缺失或者不完善等问题。 举个栗子,我们在实现一个自动化录制回放的过程中发现,需要去修改Flutter框架(Dart层面)的代码才能够满足要求,这就会有了对框架的侵入性。要解决这种侵入性的问题,更好地减少迭代过程中的维护成本,我们考虑的首要方案即面向切面编程。 那么如何解决AOP for Flutter这个问题呢?本文将重点介绍一个闲鱼技术团队开发的针对Dart的AOP编程框架AspectD。 AspectD:面向Dart的AOP框架 AOP能力究竟是运行时还是编译时支持依赖于语言本身的特点。举例来说在iOS中,Objective C本身提供了强大的运行时和动态性使得运行期AOP简单易用。在Android下,Java语言的特点不仅可以实现类似A...
- 下一篇
Linux三剑客之awk详解
第一篇 awk简介与表达式实例 一种名字怪异的语言 模式扫描和处理,处理数据和生成报告。 awk不仅仅是linux系统中的一个命令,而且是一种编程语言;它可以用来处理数据和生成报告(excel);处理的数据可以是一个或多个文件;可以是直接来自标准输入,也可以通过管道获取标准输入;awk可以在命令行上直接编辑命令进行操作,也可以编写成awk程序来进行更为复杂的运用。 sed处理stream editor文本流,水流。 一、awk环境简介 本文涉及的awk为gawk,即GNU版本的awk。 [root@creditease awk]# cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) [root@creditease awk]# uname -r 3.10.0-862.el7.x86_64 [root@creditease awk]# ll `which awk` lrwxrwxrwx. 1 root root 4 Nov 7 14:47 /usr/bin/awk -> gawk [root@creditease...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7设置SWAP分区,小内存服务器的救世主
- Hadoop3单机部署,实现最简伪集群
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- 设置Eclipse缩进为4个空格,增强代码规范