RocketMQ消息队列还没入门就想放弃
题外话
什么情况下的异步操作需要使用消息队列而不是多线程?
- 消息队列和多线程两者并不冲突,多线程可以作为队列的生产者和消费者。
使用外部的消息队列时,第一是可以提高应用的稳定性,当程序fail后,已经写入外部消息队列的数据依旧是保存的,如果使用两步commit的队列的话,可以更加提高这个项目。 -
用线程的话,会占用主服务器资源, 消息队列的话, 可以放到其他机器上运行, 让主服务器尽量多的服务其他请求。我个人认为, 如果用户不急着知道结果的操作, 用消息队列, 否则再考虑用不用线程。
-
解耦更充分,架构更合理
- 多线程是在编程语言层面解决问题
- 消息队列是在架构层面解决问题
我认为架构层面解决问题是“觉悟比较高的方式“,理想情况下应该限制语言层面滥用多线程,能不用就不用
-
- 不关心执行结果的都可以放到消息队列,不需要及时到达,放到消息队列中慢慢消化
- 批量发送邮件时,数据量庞大,如果使用多线程对系统不安全
消息队列能解决什么问题
- 异步处理
- 应用解耦
- 流量削锋
- 日志处理
- 消息通讯
环境介绍
注意尽量将rocketmq的1.应用版本,2.jar包依赖,3.recketmq-console-ng的jar包依赖版本保持一致,不然可能会出现非常诡异的问题
此项目所使用版本: rocketmq:4.3.0
,OS: win10
- jar包依赖
compile group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.3.0'
- 下载 rocketmq应用
http://rocketmq.apache.org/release_notes/release-notes-4.3.0/ -
windows下rocketmq环境配置与启动
参考 https://www.jianshu.com/p/4a275e779afa- 在rocketmq的bin目录下启动NAMESERVER(相当于服务注册中心)
start mqnamesrv.cmd
- 启动 broker(真正工作的服务器,存储消息的服务器)
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
- 在rocketmq的bin目录下启动NAMESERVER(相当于服务注册中心)
-
可视化rocketmq管理项目下载
https://github.com/apache/rocketmq-externals.git- 将这个项目里面
rocketmq-console-ng
里的rocketmq依赖修改成与你项目依赖的版本一致,次项目是4.3.0
- 将这个项目里面
- 第三步已经把rocketmq的nameServer与broker启动起来
- 启动rocket-console-ng可视化管理项目,该项目是基于springboot的
- 访问rocket-console-ng的服务地址
到此环境搭建完成!!!
回到自己的程序↓↓↓
配置信息
###producer #该应用是否启用生产者 rocketmq: producer: isOnOff: on #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 groupName: ${spring.application.name} #mq的nameserver地址 namesrvAddr: 127.0.0.1:9876 #消息最大长度 默认1024*4(4M) maxMessageSize: 4096 #发送消息超时时间,默认3000 sendMsgTimeout: 3000 #发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 ###consumer ##该应用是否启用消费者 consumer: isOnOff: on groupName: ${spring.application.name} #mq的nameserver地址 namesrvAddr: 127.0.0.1:9876 #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*; topics: futaotopic~*; consumeThreadMin: 20 consumeThreadMax: 64 #设置一次消费消息的条数,默认为1条 consumeMessageBatchMaxSize: 1 reConsumerTimes: 3
生产者Bean
package com.futao.springmvcdemo.service.impl import com.futao.springmvcdemo.foundation.LogicException import com.futao.springmvcdemo.model.entity.constvar.ErrorMessage import com.futao.springmvcdemo.model.system.SystemConfig import com.futao.springmvcdemo.service.RocketMqService import org.apache.commons.lang3.StringUtils import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently import org.apache.rocketmq.client.producer.DefaultMQProducer import org.apache.rocketmq.common.consumer.ConsumeFromWhere import org.apache.rocketmq.common.protocol.heartbeat.MessageModel import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.stereotype.Service import java.nio.charset.Charset /** * @author futao * Created on 2018/10/18. */ @Service open class RocketMqServiceImpl : RocketMqService { private val logger = LoggerFactory.getLogger(RocketMqServiceImpl::class.java) /** * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 */ @Value("\${rocketmq.producer.groupName}") private lateinit var producerGroupName: String @Value("\${rocketmq.producer.namesrvAddr}") private lateinit var producerNamesrvAddr: String /** * 消息最大大小,默认4M */ @Value("\${rocketmq.producer.maxMessageSize}") private var maxMessageSize: Int = 0 /** * 消息发送超时时间,默认3秒 */ @Value("\${rocketmq.producer.sendMsgTimeout}") private var sendMsgTimeout: Int = 0 /** * 消息发送失败重试次数,默认2次 */ @Value("\${rocketmq.producer.retryTimesWhenSendFailed}") private var retryTimesWhenSendFailed: Int = 0 /** * 生产者Bean */ @Bean override fun producer(): DefaultMQProducer { if (this.producerGroupName.isEmpty()) { throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_GROUP_NAME_EMPTY) } if (this.producerNamesrvAddr.isEmpty()) { throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY) } val defaultMQProducer = DefaultMQProducer(producerGroupName) defaultMQProducer.namesrvAddr = producerNamesrvAddr defaultMQProducer.maxMessageSize = maxMessageSize defaultMQProducer.sendMsgTimeout = sendMsgTimeout defaultMQProducer.isVipChannelEnabled = false //消息发送到mq服务器失败重试次数 defaultMQProducer.retryTimesWhenSendFailed = retryTimesWhenSendFailed try { defaultMQProducer.start() logger.info("rocketMq Producer start success; nameServer:{},producerGroupName:{}", producerNamesrvAddr, producerGroupName) } catch (e: Exception) { logger.error("rocketMq Producer start fail;{}", e.message, e) } return defaultMQProducer } }
消费者
@Value("\${rocketmq.consumer.namesrvAddr}") private lateinit var consumerNamesrvAddr: String @Value("\${rocketmq.consumer.groupName}") private lateinit var consumerGroupName: String @Value("\${rocketmq.consumer.consumeThreadMin}") private var consumeThreadMin: Int = 0 @Value("\${rocketmq.consumer.consumeThreadMax}") private var consumeThreadMax: Int = 0 @Value("\${rocketmq.consumer.topics}") private lateinit var topics: String @Value("\${rocketmq.consumer.consumeMessageBatchMaxSize}") private var consumeMessageBatchMaxSize: Int = 0 // @Resource // private lateinit var mqMessageListenerProcessor: MQConsumeMsgListenerProcessor @Value("\${reConsumerTimes}") private var reConsumerTimes: Int = 0 /** * 消费者Bean */ @Bean override fun consumer(): DefaultMQPushConsumer { val topic = SystemConfig.ROCKET_MQ_TOPIC_MAIL val tag = SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER if (this.consumerGroupName.isEmpty()) { throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_GROUP_NAME_EMPTY) } if (this.consumerNamesrvAddr.isEmpty()) { throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY) } if (this.topics.isEmpty()) { throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_TOPICS_EMPTY) } try { //DefaultMQPushConsumer DefaultMQPullConsumer val defaultMQPushConsumer = DefaultMQPushConsumer(consumerGroupName) defaultMQPushConsumer.namesrvAddr = consumerNamesrvAddr defaultMQPushConsumer.consumeThreadMin = consumeThreadMin defaultMQPushConsumer.isVipChannelEnabled = false // defaultMQPushConsumer.createTopic() defaultMQPushConsumer.consumeThreadMax = consumeThreadMax //消费模式 集群还是广播,默认为集群(自动负载均衡) //广播消费: 消息会发给Consume Group中的每一个消费者进行消费,如果设置为广播消息会导致NOT_ONLINE异常,https://github.com/apache/rocketmq/issues/296 defaultMQPushConsumer.messageModel = MessageModel.CLUSTERING // 设置消费模型, //consumer.setMessageModel(MessageModel.CLUSTERING); // * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 // * 如果非第一次启动,那么按照上次消费的位置继续消费 defaultMQPushConsumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET //设置一次消费消息的条数,默认为1条 defaultMQPushConsumer.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize //订阅topic defaultMQPushConsumer.subscribe(topic, tag) // defaultMQPushConsumer.registerMessageListener(mqMessageListenerProcessor) defaultMQPushConsumer.registerMessageListener( MessageListenerConcurrently { msgs, _ -> if (msgs == null || msgs.isEmpty()) { logger.info("接受到的消息为空,不处理,直接返回成功") return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS } val msg = msgs[0] logger.info("接收到的消息为:" + msg.toString()) if (msg.topic == topic && msg.tags == tag) { //判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重) //获取该消息重试次数 if (msg.reconsumeTimes >= reConsumerTimes) { //消息已经重试了3次,如果不需要再次消费,则返回成功 //TODO("如果重试了三次还是失败则执行对于失败的业务逻辑") logger.error("消息重试消费失败:", msg) return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS } else { //如果失败重试次数还没到三次则继续重试 ConsumeConcurrentlyStatus.RECONSUME_LATER } //TODO("开始正常的业务逻辑") println(StringUtils.repeat(":", 30) + String(msg.body, Charset.forName(SystemConfig.UTF8_ENCODE))) } return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS //消费成功 } ) defaultMQPushConsumer.start() logger.info("rocketMq Consumer start success; namesrvAddr:{},groupName:{},topics:{}", consumerNamesrvAddr, consumerGroupName, topics) return defaultMQPushConsumer } catch (e: Exception) { logger.error("rocketMq Consumer start fail;{}", e.message, e) return DefaultMQPushConsumer() } }
简单测试
-
发送注册邮件的topic与tag配置
- 个人理解的topic: 一类业务可以归为一个topic,比如所有的发邮件功能
- 个人理解的tag: 某类业务下的细分,比如发送邮件业务下的发送注册邮件可以使用一个tag,发送忘记密码邮件可以再使用一个tag
/** * rocket mq 发送邮件的 topic */ public static final String ROCKET_MQ_TOPIC_MAIL = "topic_mail"; /** * rocket mq 发送邮件-注册邮件的tag */ public static final String ROCKET_MQ_TAG_MAIL_REGISTER = "tag_mail_register";
- 发送邮件消息队列Service
@Resource lateinit var producer: DefaultMQProducer /** * 通过消息队列发送邮件 */ override fun sendMq(mailM: MailM) { val message = Message(SystemConfig.ROCKET_MQ_TOPIC_MAIL, SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER, JSON.toJSONString(mailM).toByteArray(Charset.forName(SystemConfig.UTF8_ENCODE))) try { producer.send(message) } catch (e: Exception) { logger.error(e.message, e) } }
- 请求controller
@GetMapping("sendMailMq") open fun sendMailMq() { val mailM = MailM().apply { to = arrayOf("1185172056@qq.com") cc = arrayOf("taof@wicrenet.com") subject = "消息队列" content = "<h1>您好,RocketMq</h1>" } mailService.sendMq(mailM) }
-
在请求了controller之后可以在rocketmq-console-ng控制台查看到相应的topic与消息信息
- topic
- 已发送到rocketmq服务器上的消息
- 查看消息状态
- 查看控制台
- topic
坑:
- 消息不能被消费使用RocketMq控制台resend提示NOT_CONSUME_YET:检查rocketmq应用版本,rocketmq-console-ng依赖版本,自己的项目依赖jar包版本是否一致
- Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException:检查rocketmq应用版本,rocketmq-console-ng依赖版本,自己的项目依赖jar包版本是否一致
- Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message:尝试把消费者的消费模式改成集群模式
- NOT_CONSUME_YET:如果还是不能解决请不要使用公司的网络,公司的网络可能会有很多的限制,用自己的手机进行测试(我被这个网络给坑惨了)
资源:
Windows下安装RocketMq:https://www.jianshu.com/p/4a275e779afa
RocketMq名词解释: https://my.oschina.net/javamaster/blog/2051703
解释Push与Pull区别: https://www.jianshu.com/p/f071d5069059?utm_source=oschina-app
官网:http://rocketmq.apache.org/
windows下rocketmq的消息信息存储在 C:\Users\user\store
文件夹下,删除该文件夹即可删除所有的消息
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
求求你们,别消费程序员了!
最近一段时间,微博、朋友圈都被程序员刷屏了。 先是微博上充斥着各种程序员格子衫的段子,紧接着又有各种程序员穿搭指南被刷屏了,虽然比较幽默,但是幽默中暗示程序员没品、秃头,然后再加上邋遢、情商低、没女朋友等等都跟程序员联系在一起,被各个行业津津乐道,不了解的还以为程序员是这个世界奇葩的物种一样,今天我得好好说道说道。 真的,程序员哪有那么神秘、那么奇葩,我们不过就是普通人而已,如果非要说有什么特别的话,无非就是比较忠厚老实一些,对待工作比较较真一些,心态比较想得开一些,如果因为这些就认为程序员情商低、没品味、秃头...未免太狭隘了吧,我们喜欢自黑也就算了,外面人这样黑我们,还这么频繁,你们的良心真的不会痛么? 拿程序员格子衫说事的,穿格子衫不应该是每个人的权利么,把个别特例无限放大有点说不过去吧,我今天还在路上碰到不少穿格子衫的漂亮小姐姐呢,搞的现在我都不敢穿格子衫了。。。 秃头那就更说不过去了,据我所知,脱发、秃头是全人类的问题,去看看吴彦祖微博最新的照片,那发际线往后移的...我甚至一度怀疑,这是人类进化的方向,看看从猿进化到人,脱了多少毛,没准现在轮到了头发谁说得准呢。 说程序员没...
- 下一篇
一个Java程序员的阿里之路
前言 最近有些朋友在面试阿里,加上 Java-Interview 项目的原因也有小伙伴和我讨论,近期也在负责部门的招牌,这让我想起年初那段长达三个月的奇葩面试经历。 本来没想拿出来说的,毕竟最后也没成。 但由于那几个月的经历让我了解到了大厂的工作方式、对候选同学的考察重点以及面试官的套路等都有了全新的认识。 当然最重要的是这段时间的查漏补缺也让自己精进不少。 先交代下背景吧: 从去年 12 月到今年三月底,我前前后后面了阿里三个部门。 其中两个部门通过了技术面试,还有一个跪在了三面。 光看结果还不错,但整个流程堪称曲折。 下面我会尽量描述流程以及大致的面试题目大纲,希望对想要跳槽、正在面试的同学带来点灵感,帮助可能谈不上,但启发还是能有。 以下内容较长,请再次备好瓜子板凳。 A 部门 首先是第一次机会,去年 12 月份有位大佬加我,后来才知道是一个部门的技术 Leader 在网上看到我的博客,问我想不想来阿里试试。 这时距离上次面阿里也过去一年多了,也想看看现在几斤几两,于是便同意了。 在推荐一周之后收到了杭州打来的电话,说来也巧,那时候我正在机场候机,距离登记还有大概一个小时,心想时...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- CentOS关闭SELinux安全模块
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- 设置Eclipse缩进为4个空格,增强代码规范
- Windows10,CentOS7,CentOS8安装Nodejs环境