java B2B2C 仿淘宝电子商城系统-基于Rabbitmq实现延迟消息
- 预备知识
1.1 消息传递
首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的?
当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue)。
带着这几个关键字:交换器、路由键和队列。
1.2 交换器类型
如之前所说,交换器根据规则决定消息的路由方向。因此,rabbitmq的消息投递分类便是从交换器开始的,不同的交换器实现不同的路由算法便实现了不同的消息投递方式。
direct交换器
direct -> routingKey -> queue,相当一种点对点的消息投递,如果路由键匹配,就直接投递到相应的队列
fanout交换器
fanout交换器相当于实现了一(交换器)对多(队列)的广播投递方式
topic交换器
提供一种模式匹配的投递方式,我们可以根据主题来决定消息投递到哪个队列。
1.3 消息延迟
本文想要实现一个可延迟发送的消息机制。消息如何延迟?
ttl (time to live) 消息存活时间
ttl是指一个消息的存活时间。
Per-Queue Message TTL in Queues
引用官方的一句话:
TTL can be set for a given queue by setting the x-message-ttl argument to queue.declare, or by setting the message-ttl policy. A message that has been in the queue for longer than the configured TTL is said to be dead.
我们可以通过x-message-ttl设置一个队列中消息的过期时间,消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。
Per-Message TTL in Publishers
引用官方的一句话:
A TTL can be specified on a per-message basis, by setting the expiration field in the basic AMQP class when sending a basic.publish.
The value of the expiration field describes the TTL period in milliseconds. The same constraints as for x-message-ttl apply. Since the expiration field must be a string, the broker will (only) accept the string representation of the number.
我们可以通过设置每一条消息的属性expiration,指定单条消息有效期。消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。
重新路由-死信交换机(Dead Letter Exchanges)
引用官方一句话:
Dead Letter Exchanges
Messages from a queue can be ‘dead-lettered’; that is, republished to
another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with
requeue=false, The TTL for the message expires; or The queue length
limit is exceeded. Dead letter exchanges (DLXs) are normal exchanges.
They can be any of the usual types and are declared as usual.
To set the dead letter exchange for a queue, set the x-dead-letter-exchange argument to the name of the exchange.
我们可以通过设置死信交换器(x-dead-letter-exchange)来重新发送消息到另外一个队列,而这个队列将是最终的消费队列。
- 具体实现
rabbitmq配置
属性文件-rabbitmq.properties
交换、路由等配置按照以上策略,其中,添加了prefetch参数来根据服务器能力控制消费数量。
连接用户名
mq.user =sms_user
密码
mq.password =123456
主机
mq.host =192.168.99.100
端口
mq.port =5672
默认virtual-host
mq.vhost =/
the default cache size for channels is 25
mq.channelCacheSize =50
发送消息路由
sms.route.key =sms_route_key
延迟消息队列
sms.delay.queue =sms_delay_queue
延迟消息交换器
sms.delay.exchange =sms_delay_exchange
消息的消费队列
sms.queue =sms_queue
消息交换器
sms.exchange =sms_exchange
每秒消费消息数量
sms.prefetch =30
配置rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="rabbitmq.properties"/> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="connectionFactory" username="${mq.user}" password="${mq.password}" host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" /> <!--定义rabbit template用于数据的接收和发送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定义queue --> <rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" /> <!-- 创建延迟,有消息有效期的队列 --> <rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <!-- 队列默认消息过期时间 --> <value type="java.lang.Long">3600000</value> </entry> <!-- 消息过期根据重新路由 --> <entry key="x-dead-letter-exchange" value="${sms.exchange}"/> </rabbit:queue-arguments> </rabbit:queue> <!-- 定义direct exchange,sms_queue --> <rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 延迟消息配置,durable=true 持久化生效 --> <rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}"> <rabbit:listener queues="${sms.queue}" ref="messageReceiver"/> </rabbit:listener-container> </beans>
消息发布者
package git.yampery.producer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @decription MsgProducer * <p>生产者</p> * @author Yampery * @date 2018/2/11 11:44 */ @Component public class MsgProducer { @Resource private AmqpTemplate amqpTemplate; @Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE; @Value("${sms.exchange}") private String SMS_EXCHANGE; @Value("${sms.route.key}") private String SMS_ROUTE_KEY; /** * 延迟消息放入延迟队列中 * @param msg * @param expiration */ public void publish(String msg, String expiration) { amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> { // 设置消息属性-过期时间 message.getMessageProperties().setExpiration(expiration); return message; }); } /** * 非延迟消息放入待消费队列 * @param msg */ public void publish(String msg) { amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg); } }
消费者
package git.yampery.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * @decription MsgConsumer * <p>消费者</p> * @author Yampery * @date 2018/2/11 11:43 */ public class MsgConsumer implements MessageListener { @Override public void onMessage(Message message) { String msg; try { // 线程每秒消费一次 Thread.sleep(1000); msg = new String(message.getBody(), "utf-8"); System.out.println(msg); } catch (Exception e) { // 这里并没有对服务异常等失败的消息做处理,直接丢弃了 // 防止因业务异常导致消息失败造成unack阻塞再队列里 // 可以选择路由到另外一个专门处理消费失败的队列 return; } } }
测试
package git.yampery.mq; //需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 // 一零三八七七四六二六 import com.alibaba.fastjson.JSONObject; import git.yampery.producer.MsgProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; /** * @decription TestMq * <p>测试</p> * @author Yampery * @date 2018/2/11 15:03 */ @RunWith(SpringRunner.class) @SpringBootTest public class TestMq { @Resource private MsgProducer producer; @Test public void testMq() { JSONObject jObj = new JSONObject(); jObj.put("msg", "这是一条短信"); producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000)); } }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
2019人工智能项目书籍汇总下载
python测试开发项目实战-目录 python工具书籍下载-持续更新 下面书籍下载地址 https://china-testing.github.io/ai_project_books.html 书籍:python人工智能项目 Intelligent Projects Using Python - 2019 简介 实施机器学习和深度学习方法,使用Python构建智能,认知AI项目 主要特点 - 帮助您掌握AI算法和概念的入门指南 - 8个实际项目,解决医疗保健,电子商务和监控方面的各种挑战 - 使用TensorFlow,Keras和其他Python库来实现智能AI应用程序 图书说明 如果您想使用Python从领先的AI域构建富有洞察力的项目,本书将是一个完美的伴侣。 本书涵盖了AI所有核心学科项目的详细实施。我们首先介绍如何使用机器学习和深度学习技术创建智能系统的基础知识。您将吸收各种神经网络架构,如CNN,RNN,LSTM,以解决关键的新世界挑战。您将学习如何训练模型以检测人眼中的糖尿病视网膜病变状况,并创建用于执行视频到文本翻译的智能系统。您将在医疗保健领域中使用转移学习技术,并...
- 下一篇
Redis分布式锁的正确实现方式
前言 分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性。在任意时刻,只有一个客户端能持有锁。 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。 代码实现 组件依赖 首先我们要通过Maven引入Jedis开源组件,在pom.xml文件加入下面的代码: redis.clients jedis 2.9.0 加锁代码 正确姿势 Talk is cheap, show me the code。先展示代码,再带大家慢慢解释为什么这样...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS8编译安装MySQL8.0.19
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7,8上快速安装Gitea,搭建Git服务器