Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送
摘要:
本篇博文是“Java秒杀系统实战系列文章”的第八篇,在这篇文章中我们将整合消息中间件RabbitMQ,包括添加依赖、加入配置信息以及自定义注入相关操作组件,比如RabbitTemplate等等,最终初步实现消息的发送和接收,并在下一篇章将其与邮件服务整合,实现“用户秒杀成功发送邮件通知消息”的功能!
内容:
对于消息中间件RabbitMQ,想必各位小伙伴没有用过、也该有听过,它是一款目前市面上应用相当广泛的消息中间件,可以实现消息异步通信、业务服务模块解耦、接口限流、消息分发等功能,在微服务、分布式系统架构中可以说是充当着一名了不起的角色!(详细的介绍,Debug在这里就不赘述了,各位小伙伴可以上官网看看其更多的介绍及其典型的应用场景)!
在本篇博文中,我们将使用RabbitMQ充当消息发送的组件,将它与后面篇章介绍的“邮件服务”结合实现“用户秒杀成功后异步发送邮件通知消息,告知用户秒杀已经成功!”,下面我们一起进入代码实战吧。
(1)要使用RabbitMQ,前提得在本地开发环境或者服务器安装RabbitMQ服务,如下图所示为Debug在本地安装RabbitMQ服务成功后访问其后端控制台应用的首页:
之后我们开始将其与SpringBoot进行整合。首先需要加入其依赖,其版本号跟SpringBoot的版本一致,版本号为1.5.7.RELEASE:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot.version}</version>
</dependency>
然后需要在配置文件application.properties中加入RabbitMQ服务相关的配置,比如其服务所在的Host、端口Port等等:
#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10
(2)紧接着,我们借助SpringBoot天然具有的一些特性,自动注入RabbitMQ一些组件的配置,包括其“单一实例消费者”配置、“多实例消费者”配置以及用于发送消息的操作组件实例“RabbitTemplate”的配置:
//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {
private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
//单一消费者
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
return factory;
}
//多个消费者
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//确认消费模式-NONE
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
}
在RabbitMQ的消息发送组件RabbitTemplate的配置中,我们还特意加入了“消息发送确认”、“消息丢失回调”的输出配置,即当消息正确进入到队列后,即代表消息发送成功;当消息找不到对应的队列(在某种程度上,其实也就是找不到交换机和路由)时,会输出消息丢失。
(3)完了之后,我们准备开始使用RabbitMQ实现消息的发送和接收。首先,我们需要在RabbitmqConfig配置类中创建队列、交换机、路由以及绑定等Bean组件,如下所示:
//构建异步发送邮箱通知的消息模型
@Bean
public Queue successEmailQueue(){
return new Queue(env.getProperty("mq.kill.item.success.email.queue"),true);
}
@Bean
public TopicExchange successEmailExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.email.exchange"),true,false);
}
@Bean
public Binding successEmailBinding(){
return BindingBuilder.bind(successEmailQueue()).to(successEmailExchange()).with(env.getProperty("mq.kill.item.success.email.routing.key"));
}
其中,环境变量实例env读取的那些属性我们是配置在application.properties文件中的,如下所示:
mq.env=test
#秒杀成功异步发送邮件的消息模型
mq.kill.item.success.email.queue=${mq.env}.kill.item.success.email.queue
mq.kill.item.success.email.exchange=${mq.env}.kill.item.success.email.exchange
mq.kill.item.success.email.routing.key=${mq.env}.kill.item.success.email.routing.key
紧接着,我们需要在通用的消息发送服务类 RabbitSenderService 中写一段发送消息的方法,该方法用于接收“订单编号”参数,然后在数据库中查询其对应的详细订单记录,将该记录充当“消息”并发送至RabbitMQ的队列中,等待被监听消费:
/**
* RabbitMQ通用的消息发送服务
* @Author:debug (SteadyJack)
* @Date: 2019/6/21 21:47
**/
@Service
public class RabbitSenderService {
public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@Autowired
private ItemKillSuccessMapper itemKillSuccessMapper;
//秒杀成功异步发送邮件通知消息
public void sendKillSuccessEmailMsg(String orderNo){
log.info("秒杀成功异步发送邮件通知消息-准备发送消息:{}",orderNo);
try {
if (StringUtils.isNotBlank(orderNo)){
KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderNo);
if (info!=null){
//TODO:rabbitmq发送消息的逻辑
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.email.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.email.routing.key"));
//TODO:将info充当消息发送至队列
rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties=message.getMessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
return message;
}
});
}
}
}catch (Exception e){
log.error("秒杀成功异步发送邮件通知消息-发生异常,消息为:{}",orderNo,e.fillInStackTrace());
}
}
}
(4)最后,是在通用的消息接收服务类RabbitReceiverService中实现消息的接收,其完整的源代码如下所示:
/**
* RabbitMQ通用的消息接收服务
* @Author:debug (SteadyJack)
* @Date: 2019/6/21 21:47
**/
@Service
public class RabbitReceiverService {
public static final Logger log= LoggerFactory.getLogger(RabbitReceiverService.class);
@Autowired
private MailService mailService;
@Autowired
private Environment env;
@Autowired
private ItemKillSuccessMapper itemKillSuccessMapper;
//秒杀异步邮件通知-接收消息
@RabbitListener(queues = {"${mq.kill.item.success.email.queue}"},containerFactory = "singleListenerContainer")
public void consumeEmailMsg(KillSuccessUserInfo info){
try {
log.info("秒杀异步邮件通知-接收消息:{}",info);
//到时候这里将整合邮件服务发送邮件通知消息的逻辑
}catch (Exception e){
log.error("秒杀异步邮件通知-接收消息-发生异常:",e.fillInStackTrace());
}
}
}
至此,关于SpringBoot整合消息中间件RabbitMQ的代码实战,本篇文章就介绍到这里了。
最后一点,我们需要进行测试,即用户在界面发起“抢购”的请求操作之后,如果能秒杀成功,则RabbitMQ会发送、接收一条消息,如下所示:
好了,关于RabbitMQ的使用,本文到此就暂且告一段落了,在下一篇文章中我们将把它与邮件服务进行整合,实现“用户秒杀成功后异步发送邮件通知消息给到用户邮箱”的功能!除此之外,我们还将在后面的篇章介绍“如何使用RabbitMQ的死信队列,处理用户下单成功后却超时未支付的订单~在那里我们将采取失效的操作”。
补充:
1、由于相应的博客的更新可能并不会很快,故而如果有想要快速入门以及实战整套系统的,可以阅读: Java商城秒杀系统的设计与实战视频教程(SpringBoot版)
2、目前,这一秒杀系统的整体构建与代码实战已经全部完成了,完整的源代码数据库地址可以来这里下载:https://gitee.com/steadyjack/SpringBoot-SecondKill

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
java中线程安全,线程死锁,线程通信快速入门
java中线程安全,线程死锁,线程通信快速入门一:多线程安全问题 1 引入 复制代码 /* * 多线程并发访问同一个数据资源 * 3个线程,对一个票资源,出售 */ public class ThreadDemo { public static void main(String[] args) { //创建Runnable接口实现类对象 Tickets t = new Tickets(); //创建3个Thread类对象,传递Runnable接口实现类 Thread t0 = new Thread(t); Thread t1 = new Thread(t); Thread t2 = new Thread(t); t0.start(); t1.start(); t2.start(); } } /* * 通过线程休眠,出现安全问题 */ public class Tickets implements Runnable{ //定义出售的票源 private int ticket = 100; private Object obj = new Object(); public void run...
-
下一篇
如何开启阿里云安全组规则?配置阿里云安全组规则教程
1、注册阿里云账号:点击注册地址 2、设置安全组需要有一台自己的服务器www.aliyun.com/product/ecs?source=5176.11533457 3、点击实例->管理->网络和安全组->安全组配置或者加入安全组4、点击配置规则进入配置页面设置安全组5、在这里可以选择添加安全组或者修改克隆安全组6、这里我们以mysql数据库端口为例需添加3306端口7、选择自己的端口或者选择自定义tcp,这里我们直接选择3306端口8、授权对象一般是0.0.0.0/0可以允许所有人进行远程连接,如果有特殊需求可以指定ip地址9、点击保存完之后千万不要以为结束了 我们需要重启阿里云服务器安全组配置才生效。
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程