一文详解RocketMQ-Spring的源码解析与实战
摘要:这篇文章主要介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。
本文分享自华为云社区《RocketMQ-Spring : 实战与源码解析一网打尽》,作者:勇哥java实战分享。
RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。
这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。
一 SDK 简介
项目地址:https://github.com/apache/rocketmq-spring
rocketmq-spring 的本质是一个 Spring Boot starter 。
Spring Boot 基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。
Spring Boot starter 构造的启动器使用起来非常方便,开发者只需要在 pom.xml 引入 starter 的依赖定义,在配置文件中编写约定的配置即可。
下面我们看下 rocketmq-spring-boot-starter 的配置:
1、引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
2、约定配置
接下来,我们分别按照生产者和消费者的顺序,详细的讲解消息收发的操作过程。
二 生产者
首先我们添加依赖后,进行如下三个步骤:
1、配置文件中配置如下
rocketmq: name-server: 127.0.0.1:9876 producer: group: platform-sms-server-group # access-key: myaccesskey # secret-key: mysecretkey topic: sms-common-topic
生产者配置非常简单,主要配置名字服务地址和生产者组。
2、需要发送消息的类中注入 RcoketMQTemplate
@Autowired private RocketMQTemplate rocketMQTemplate; @Value("${rocketmq.topic}") private String smsTopic;
3、发送消息,消息体可以是自定义对象,也可以是 Message 对象
rocketMQTemplate 类包含多钟发送消息的方法:
- 同步发送 syncSend
- 异步发送 asyncSend
- 顺序发送 syncSendOrderly
- oneway发送 sendOneWay
下面的代码展示如何同步发送消息。
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags; SendResult sendResult = rocketMQTemplate.syncSend( destination, MessageBuilder.withPayload(messageContent). setHeader(MessageConst.PROPERTY_KEYS, uniqueId). build() ); if (sendResult != null) { if (sendResult.getSendStatus() == SendStatus.SEND_OK) { // send message success ,do something } }
syncSend 方法的第一个参数是发送的目标,格式是:topic + ":" + tags ,
第二个参数是:spring-message 规范的 message 对象 ,而 MessageBuilder 是一个工具类,方法链式调用创建消息对象。
三 消费者
1、配置文件中配置如下
rocketmq: name-server: 127.0.0.1:9876 consumer1: group: platform-sms-worker-common-group topic: sms-common-topic
2、实现消息监听器
@Component @RocketMQMessageListener( consumerGroup = "${rocketmq.consumer1.group}", //消费组 topic = "${rocketmq.consumer1.topic}" //主题 ) public class SmsMessageCommonConsumer implements RocketMQListener<String> { public void onMessage(String message) { System.out.println("普通短信:" + message); } }
消费者实现类也可以实现 RocketMQListener<MessageExt>, 在 onMessage 方法里通过 RocketMQ 原生消息对象 MessageExt 获取更详细的消息数据 。
public void onMessage(MessageExt message) { try { String body = new String(message.getBody(), "UTF-8"); logger.info("普通短信:" + message); } catch (Exception e) { logger.error("common onMessage error:", e); } }
四 源码概览
最新源码中,我们可以看到源码中包含四个模块:
1、rocketmq-spring-boot-parent
该模块是父模块,定义项目所有依赖的 jar 包。
2、rocketmq-spring-boot
核心模块,实现了 starter 的核心逻辑。
3、rocketmq-spring-boot-starter
SDK 模块,简单封装,外部项目引用。
4、rocketmq-spring-boot-samples
示例代码模块。这个模块非常重要,当用户使用 SDK 时,可以参考示例快速开发。
五 starter 实现
我们重点分析下 rocketmq-spring-boot 模块的核心源码:
spring-boot-starter 实现需要包含如下三个部分:
1、定义 Spring 自身的依赖包和 RocketMQ 的依赖包 ;
2、定义spring.factories 文件
在 resources 包下创建 META-INF 目录后,新建 spring.factories 文件,并在文件中定义自动加载类,文件内容是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
spring boot 会根据文件中配置的自动化配置类来自动初始化相关的 Bean、Component 或 Service。
3、实现自动加载类
在 RocketMQAutoConfiguration 类的具体实现中,我们重点分析下生产者和消费者是如何分别启动的。
▍生产者发送模板类:RocketMQTemplate
RocketMQAutoConfiguration 类定义了两个默认的 Bean :
首先SpringBoot项目中配置文件中的配置值会根据属性条件绑定到 RocketMQProperties 对象 中,然后使用 RocketMQ 的原生 API 分别创建生产者 Bean 和拉取消费者 Bean , 分别将两个 bean 设置到 RocketMQTemplate 对象中。
两个重点需要强调:
- 发送消息时,将 spring-message 规范下的消息对象封装成 RocketMQ 消息对象
- 默认拉取消费者 litePullConsumer 。拉取消费者一般用于大数据批量处理场景 。
RocketMQTemplate 类封装了拉取消费者的receive方法,以方便开发者使用。
▍自定义消费者类
下图是并发消费者的例子:
那么 rocketmq-spring 是如何自动启动消费者呢 ?
spring 容器首先注册了消息监听器后置处理器,然后调用 ListenerContainerConfiguration 类的 registerContainer 方法 。
对比并发消费者的例子,我们可以看到: DefaultRocketMQListenerContainer 是对 DefaultMQPushConsumer 消费逻辑的封装。
封装消费消息的逻辑,同时满足 RocketMQListener 泛化接口支持不同参数,比如 String 、MessageExt 、自定义对象 。
首先DefaultRocketMQListenerContainer初始化之后, 获取 onMessage 方法的参数类型 。
然后消费者调用 consumeMessage 处理消息时,封装了一个 handleMessage 方法 ,将原生 RocketMQ 消息对象 MessageExt 转换成 onMessage 方法定义的参数对象,然后调用 rocketMQListener 的 onMessage 方法。
上图右侧标红的代码也就是该方法的精髓:
rocketMQListener.onMessage(doConvertMessage(messageExt));
六 写到最后
开源项目 rocketmq-spring 有很多值得学习的地方 ,我们可以从如下四个层面逐层进阶:
1、学会如何使用 :参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接收消息,快速编码;
2、模块设计:学习项目的模块分层 (父模块、SDK 模块、核心实现模块、示例代码模块);
3、starter 设计思路 :定义自动配置文件 spring.factories 、设计配置属性类 、在 RocketMQ client 的基础上实现优雅的封装、深入理解 RocketMQ 源码等;
4、举一反三:当我们理解了 rocketmq-spring 的源码,我们可以尝试模仿该项目写一个简单的 spring boot starter。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
JSF源码分析(一)
作者:京东零售 李孟冬 架构设计 1.7.4-HOTFIX-T4版本包布局及简要含义 看过了全包的简要,那么其核心的功能模块,就从常用的项目xml配置出发,便于我们的理解。如下: jsf-provider.xml配置 以我们地址服务的jsf-provider.xml文件为例,即: 可以看到,在JSF的配置文件中,我们并没有看到任何关于注册中心的内容。说到底,作为(集团自主研发的高效)RPC调用框架,其高可用的注册中心重中之重,所以带着这份疑惑,继续往下探究,没有注册中心地址,这些标签是怎么完成服务的注册,订阅的。 配置解析 在Spring的体系中,Spring提供了可扩展Schema的支持,即自定义的标签解析。 1、首先我们发现配置文件中自定义的xsd文件,在标签名称上找到NamespaceUri链接http://jsf.jd.com/schema/jsf/jsf.xsd 2、然后根据SPI加载,在META-INF中找到定义好Spring.handlers文件和Spring.schemas文件,一个是具体的解析器的配置,一个是jsf.xsd的具体路径 Spring.handlers文件...
- 下一篇
Split to Be Slim: 论文复现
摘要:在本论文中揭示了这样一种现象:一层内的许多特征图共享相似但不相同的模式。 本文分享自华为云社区《Split to Be Slim: 论文复现》,作者: 李长安 。 Split to Be Slim: An Overlooked Redundancy in Vanilla Convolution 论文复现 1、问题切入 已经提出了许多有效的解决方案来减少推理加速模型的冗余。然而,常见的方法主要集中在消除不太重要的过滤器或构建有效的操作,同时忽略特征图中的模式冗余。 在本论文中揭示了这样一种现象:一层内的许多特征图共享相似但不相同的模式。但是,很难确定具有相似模式的特征是否是冗余的或包含基本细节。因此,论文作者不是直接去除不确定的冗余特征,而是提出了一种基于分割的卷积操作,即 SPConv,以容忍具有相似模式但需要较少计算的特征。 具体来说,论文将输入特征图分为Representative部分和不Uncertain冗余部分,其中通过相对繁重的计算从代表性部分中提取内在信息,而对不确定冗余部分中的微小隐藏细节进行一些轻量级处理手术。为了重新校准和融合这两组处理过的特征,我们提出了一个无...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- CentOS7安装Docker,走上虚拟化容器引擎之路