Spring Cloud Stream如何处理消息重复消费?
最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。
那么什么是消费组呢?为什么要用消费组?它解决什么问题呢?摘录一段之前博文的内容,来解答这些疑问:
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
详细也可查看原文:消息驱动的微服务(消费组)。
下面,通过一个例子来看看如何使用消费组:
问题重现
构建消息消费端
第一步:创建绑定接口,绑定example-topic
输入通道(默认情况下,会绑定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。
interface ExampleBinder { String NAME = "example-topic"; @Input(NAME) SubscribableChannel input(); }
第二步:对上述输入通道创建监听与处理逻辑。
@EnableBinding(ExampleBinder.class) public class ExampleReceiver { private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class); @StreamListener(ExampleBinder.NAME) public void receive(String payload) { logger.info("Received: " + payload); } }
第三步;创建应用主类和配置文件
@SpringBootApplication public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } }
spring.application.name=stream-consumer-group server.port=0
这里设置server.port=0
,以方便在本地启动多实例来重现问题。
完成上述操作之后,启动两个该应用的实例,以备后续调用。
构建消息生产端
比较简单,需要注意的是,使用@Output
创建一个同名的输出绑定,这样发出的消息才能被上述启动的实例接收到。具体实现如下:
@RunWith(SpringRunner.class) @EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class}) public class ExampleApplicationTests { @Autowired private ExampleBinder exampleBinder; @Test public void exampleBinderTester() { exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build()); } public interface ExampleBinder { String NAME = "example-topic"; @Output(NAME) MessageChannel output(); } }
启动上述测试用例之后,可以发现之前启动的两个实例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com
。消息重复消费的问题成功重现!
使用消费组解决问题
如何解决上述消息重复消费的问题呢?我们只需要在配置文件中增加如下配置即可:
spring.cloud.stream.bindings.example-topic.group=aaa
当我们指定了某个绑定所指向的消费组之后,往当前主题发送的消息在每个订阅消费组中,只会有一个订阅者接收和消费,从而实现了对消息的负载均衡。只所以之前会出现重复消费的问题,是由于默认情况下,任何订阅都会产生一个匿名消费组,所以每个订阅实例都会有自己的消费组,从而当有消息发送的时候,就形成了广播的模式。
另外,需要注意上述配置中example-topic
是在代码中@Output
和@Input
中传入的名字。
代码示例
本文示例读者可以通过查看下面仓库的中的stream-consumer-group
项目:
如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!
以下专题教程也许您会有兴趣
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
原生js深入理解系列(四)--- 多个实例深入理解js的深拷贝和浅拷贝,多种方法实现对象的深拷贝
亲们为什么要研究深拷贝和浅拷贝呢,因为我们项目开发中有许多情况需要拷贝一个数组抑或是对象,但是单纯的靠=“赋值”并不会解决所有问题,如果遇到引用类型的对象改变新赋值的对象会造成原始对象也发生同样改变,而要去除影响就必须用到浅拷贝、深拷贝,深拷贝,对于引用对象需要进行深拷贝才会去除影响。如果是值类型直接“=”就好。 简而言之: 赋值:就是两个对象指向的内存地址一样,a=b赋值后的新对象也指向同一个存储地址所以b变化a跟随变化, 浅拷贝:拷贝对象的一级元素的地址,如果一级元素全部为值类型就会互不干扰,如果一级元素有引用类型,改变引用类型的里面的值,会改变原对象。 深拷贝:拷贝对象各级元素的存储地址。 1.引用类型引用类型通常叫做类(class),也就是说,遇到引用值,所处理的就是对象。new出来的对象都是引用对象 (1)值类型:String, 数值、布尔值、null、undefined。 对于值类型一个对象一个存储位置所以会互不干扰 (2)引用类型:对象、数组、函数。对于引用类型,a=b赋值后的新对象也指向同一个存储地址所以b变化a跟随变化, 下图如果a是{key:'1', value:'...
- 下一篇
Spring Cloud Config 基础示例
Spring Cloud Config 简介 什么是Srping Cloud Config? Spring Cloud Config 是一种分布式配置中心框架, 为分布式系统中的外部化配置提供服务器和客户端支持。(同类技术还有vault,zookeeper,Consul) 使用Config Server,可以在所有环境中管理应用程序的外部属性。客户端和服务器上的概念映射与Spring Environment和PropertySource抽象,因此它们非常适合Spring应用程序,但可以与任何语言运行的任何应用程序一起使用。当应用程序通过部署管道从开发到测试并进入生产时,您可以管理这些环境之间的配置,并确保应用程序具有迁移时需要运行的所有内容。服务器存储后端的默认实现使用git,因此它可以轻松支持配置环境的标记版本,以及可用于管理内容的各种工具。 Spring Cloud Config也主要由两部分组成: Config-Server: 用于配置外部的资源文件,支持对属性值进行加解密 Config-client:绑定到config server使用远程配置文件初始化spring,支持对属性...
相关文章
文章评论
共有0条评论来说两句吧...