SpringBoot2.0高级案例(02) :整合 RocketMQ ,实现请求异步处理
本文源码:GitHub·点这里 || GitEE·点这里
一、RocketMQ
1、架构图片
2、角色分类
(1)、Broker
RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。
(2)、NameServer
消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。
热备份: NamServer可以部署多个,相互之间独立,其他角色同时向多个NameServer 机器上报状态信息。
心跳机制: NameServer 中的 Broker、 Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, NameServer会认为某个机器出故障不可用。
(3)、Producer
消息的生成者,最常用的producer类就是DefaultMQProducer。
(4)、Consumer
消息的消费者,常用Consumer类 DefaultMQPushConsumer 收到消息后自动调用传入的处理方法来处理,实时性高 DefaultMQPullConsumer 用户自主控制 ,灵活性更高。
3、通信机制
(1)、Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer更新Topic路由信息。
(2)、Producer发送消息时候,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则更新路由信息会从NameServer重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3)、Consumer消费消息时候,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。
二、代码实现案例
1、项目结构图
版本描述
<spring-boot.version>2.1.3.RELEASE</spring-boot.version> <rocketmq.version>4.3.0</rocketmq.version>
2、配置文件
rocketmq: # 生产者配置 producer: isOnOff: on # 发送同一类消息的设置为同一个group,保证唯一 groupName: CicadaGroup # 服务地址 namesrvAddr: 127.0.0.1:9876 # 消息最大长度 默认1024*4(4M) maxMessageSize: 4096 # 发送消息超时时间,默认3000 sendMsgTimeout: 3000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 消费者配置 consumer: isOnOff: on # 官方建议:确保同一组中的每个消费者订阅相同的主题。 groupName: CicadaGroup # 服务地址 namesrvAddr: 127.0.0.1:9876 # 接收该 Topic 下所有 Tag topics: CicadaTopic~*; consumeThreadMin: 20 consumeThreadMax: 64 # 设置一次消费消息的条数,默认为1条 consumeMessageBatchMaxSize: 1 # 配置 Group Topic Tag rocket: group: rocketGroup topic: rocketTopic tag: rocketTag
3、生产者配置
/** * RocketMQ 生产者配置 */ @Configuration public class ProducerConfig { private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ; @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName if(this.maxMessageSize!=null){ producer.setMaxMessageSize(this.maxMessageSize); } if(this.sendMsgTimeout!=null){ producer.setSendMsgTimeout(this.sendMsgTimeout); } //如果发送消息失败,设置重试次数,默认为2次 if(this.retryTimesWhenSendFailed!=null){ producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
4、消费者配置
/** * RocketMQ 消费者配置 */ @Configuration public class ConsumerConfig { private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Resource private RocketMsgListener msgListener; @Bean public DefaultMQPushConsumer getRocketMQConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(msgListener); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); }catch (MQClientException e){ e.printStackTrace(); } return consumer; } }
5、消息监听配置
/** * 消息消费监听 */ @Component public class RocketMsgListener implements MessageListenerConcurrently { private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ; @Resource private ParamConfigService paramConfigService ; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(list)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); LOG.info("接受到的消息为:"+new String(messageExt.getBody())); int reConsume = messageExt.getReconsumeTimes(); // 消息已经重试了3次,如果不需要再次消费,则返回成功 if(reConsume ==3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){ String tags = messageExt.getTags() ; switch (tags){ case "rocketTag": LOG.info("开户 tag == >>"+tags); break ; default: LOG.info("未匹配到Tag == >>"+tags); break; } } // 消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
6、配置参数绑定
@Service public class ParamConfigService { @Value("${rocket.group}") public String rocketGroup ; @Value("${rocket.topic}") public String rocketTopic ; @Value("${rocket.tag}") public String rocketTag ; }
7、消息发送测试
@Service public class RocketMqServiceImpl implements RocketMqService { @Resource private DefaultMQProducer defaultMQProducer; @Resource private ParamConfigService paramConfigService ; @Override public SendResult openAccountMsg(String msgInfo) { // 可以不使用Config中的Group defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup); SendResult sendResult = null; try { Message sendMsg = new Message(paramConfigService.rocketTopic, paramConfigService.rocketTag, "open_account_key", msgInfo.getBytes()); sendResult = defaultMQProducer.send(sendMsg); } catch (Exception e) { e.printStackTrace(); } return sendResult ; } }
三、项目源码
GitHub·地址 https://github.com/cicadasmile/middle-ware-parent GitEE·地址 https://gitee.com/cicadasmile/middle-ware-parent
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
记一次构建SaaS平台项目失败后的反思
记一次构建SaaS平台项目失败后的反思 前言: 笔者从2017年起开始着手将公司现有的软件系统改造成多租户模式,以降低整个系统的运营成本。但最后这个项目以失败告终。今天,我将对这个SaaS项目是如何走向失败,做一个分析和反思。 此前,我们花费了两年的时间研发了一套教学系统,考虑到用户的数量与营运成本,后期决定将这套单体的应用程序改造为基于SaaS架构的多租户应用程序。经过短暂的需求分析后,便开始了重构工作。经过一年的艰苦奋斗,SaaS化的产品不仅用户不能接受,就连我们自己也无法成功运营。其功能的完成度差强人意,运营的成本也没有比单体应用少,反而运营难度上升了。通过一段时间的整理与思考,总结了这一次SaaS化平台失败的原因。 一、不务实的需求 一个成功的SaaS产品,需要有足够多的用户需求样本数据以及对这些样本数据深入的挖掘、分析和抽象,以得到一份通用的,覆盖率大的应用场景数据。只有如此,才能够开发出一款具有普世价值的应用软件,才能贴合SaaS用户的实际需求。而在此次的SaaS化产品的过程中,我们犯了“闭门造车”的严重错误,导致这一错误产生的原因是我们拿到的原始需求不够,仅仅凭借一两个客...
- 下一篇
如何选型一个合适的框架-分布式任务调度框架选型
1.背景 定时任务是大家再开发中一个不可避免的业务,比如在一些电商系统中可能会定时给用户发送生日券,一些对账系统中可能会定时去对账。大概再很久以前每个服务可能就一台机器,再这台机器上直接搞个Timerschedule基本上就能满足我们的业务需求,但是随着时代的变迁,单台机器已经远远不能满足我们的需要,这个时候我们可能需要10台,20台甚至更多机器来运行我们的业务,接受我们的流量,这就是我们所说的横向扩展。但是这里就有个问题,这么多台机器如果还用我们的Timerschedule去做会发生什么呢?再上面的电商系统中有可能会给某个用户发很多张生日券,对公司造成很多损失,所以我们需要一些其他方法,让定时任务在多台机器上只执行一次。 这里想问下大家在没有了解过或使用过分布式任务调度框架之前大家是如何做定时任务的呢?在Spring项目中大家肯定都知道Spring-Scheduler,只需要在Spring中的bean的对应方法上加上@Scheduler注解即可完成我们的定时任务,但是光是用这个注解还远远不能保证定时任务执行多次,我们需要一些其他手段的保证,一般来说方法可能不外乎下面几种(都是基于Sp...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS8编译安装MySQL8.0.19
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库