缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练
缓存架构之借助消息中间件RabbitMQ实现Redis缓存实时更新实战演练
一、背景介绍
前面,我们花了大量的时间来介绍消息中间件RabbitMQ,讲了其基本使用,其可靠性传输,这些对我们的缓存架构有什么用呢,我们直接上图来分析下: 我们要实现这部分功能,需要借助两个系统:
- 广告管理系统:生产广告的地方
- 缓存服务系统:消费广告的地方
这两个独立的系统又有着紧密的联系,一个是生产者,一个是消费者,我们如何建立这两个系统的联系呢,我们生产的广告,如何及时能通知你来获取呢?
通过RabbitMQ我们就建立了广告管理系统与缓存服务系统实时交互的桥梁。
二、核心功能介绍
1、广告管理系统
功能:生产广告,并将生产信息实时同步给RabbitMQ
1)添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- 发送邮件需要的2个jar -->
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>
2)基本配置
@Configuration
public class RabbitConfig {
public final static String queueName = "ad_queue";
}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3)生产者消息确认机制
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
4)发送消息
@Component
public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
private static Map<String, Integer> map = new ConcurrentHashMap<>();
private final Logger emailLogger = LoggerFactory.getLogger("emailLogger");
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String routingKey, String content) {
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.setRoutingKey(routingKey);
//这样我们就能知道,发送失败的是哪条消息了
this.rabbitTemplate.correlationConvertAndSend(content, new CorrelationData(content));
// this.rabbitTemplate.convertAndSend(routingKey, content);
}
/**
* 确认后回调:
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
/**
* 我们这里仅通过打印日志、发送邮件来预警,并没有实现自动重试机制:
* 1、将发送失败重新发送到一个队列中:fail-queue,然后可以定时对这些消息进行重发
* 2、在本地定义一个缓存map对象,定时进行重发
* 3、为了更安全,可以将所有发送的消息保存到db中,并设置一个状态(是否发送成功),定时扫描检查是否存在未成功发送的信息
* 这块知识,我们后期讲"分布式事务"的时候,在深入讲解这块内容
*/
emailLogger.error("send ack fail, cause = {}, correlationData = {}", cause, correlationData.getId());
} else {
System.out.println("send ack success");
}
}
/**
* 失败后return回调:
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
emailLogger.error("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
String str = new String(message.getBody());
retrySend(str, 3);
}
private void retrySend(String content, int retryTime){
if(map.containsKey(content)){
int count = map.get(content);
count++;
map.put(content, count);
} else {
map.put(content, 1);
}
if(map.get(content) <= retryTime) {
send(RabbitConfig.queueName, content);
}
}
}
2、缓存服务系统
功能:实时监听RabbitMQ,根据通知信息,拉取相应的广告,并刷入redis
1)添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- 发送邮件需要的2个jar -->
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>
2)基本配置
@Configuration
public class RabbitConfig {
public final static String queueName = "ad_queue";
/**
* 死信队列:
*/
public final static String deadQueueName = "ad_dead_queue";
public final static String deadRoutingKey = "ad_dead_routing_key";
public final static String deadExchangeName = "ad_dead_exchange";
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
@Bean
public Queue helloQueue() {
//将普通队列绑定到私信交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(queueName, true, false, false, args);
return queue;
}
/**
* 死信队列:
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3)消费者消息确认机制
# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
4)接收消息
@Component
@RabbitListener(queues = RabbitConfig.queueName)
public class Receiver {
private Logger logger = LoggerFactory.getLogger(Receiver.class);
private final Logger emailLogger = LoggerFactory.getLogger("emailLogger");
@Resource
UpdateRedisServiceImpl updateRedisService;
@RabbitHandler
public void process(String content, Channel channel, Message message) {
logger.info("handle msg begin = {}", content);
AdMessage adMessage = JSON.parseObject(content, AdMessage.class);
Long id = adMessage.getId();
int retryTimes = 0;
while (retryTimes < 5) {
//消费者做幂等处理(当然这只是对单台机器而言没有问题,如果是分布式集群环境,这种是不行的,后续我们会继续优化这块):防止相同类型的广告id更新问题
synchronized (AdLock.cacheLock) {
//更新redis数据:
if(!updateRedisService.updateRedis(id)){
retryTimes++;
}
}
break;
}
if (retryTimes >= 3) {
//当有多次更新失败的时候,发送邮件通知:
emailLogger.error("处理MQ[" + content + "]失败[" + retryTimes + "]次");
}
try {
if (retryTimes >= 5) {
//当有很多次更新失败的时候,丢弃这条消息或者发送到死信队列中
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}else {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
} catch (Exception e){
logger.error("消息确认失败", e);
}
logger.info("handle msg finished = {}", content);
}
}
三、实战演练
代码仓库:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
1、广告管理系统:生产者发送消息通知到RabbitMQ
场景分析:新建/更新广告的时候,消息发送是否正常 项目名:spring-boot-ad
1)正常
管控台,检查RabbitMQ是否正常收到消息
2)异常
-
消息重试机制
-
发送预警邮件
2、广告缓存服务系统:消费者接收消息并刷新到redis
场景分析:新建/更新广告的时候,消息接收是否正常 项目名:spring-boot-rabbitmq-reliability-redis
1)正常
- 管控台,- 检查RabbitMQ是非正常消费
- redis是否存在数据
2)异常
-
消息重试机制
-
发送预警邮件
-
发送到死信队列
四、预告
这节课我们讲的异常处理不太完善,下次课我们将使用延迟队列来处理异常消息
延迟队列应用场景也是很广的,请持续关注,下次分享
更多内容,请关注:
头条号:极客慧
个人网站:极客慧
更多资料分享,请入群讨论:375412858

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
EOS和DPOS委托权益证明
当Dan Larimer(Bitshares,Steem和EOS区块链的创建者)于2014年在拉斯维加斯举行的比特币会议上宣布采矿现已过时,他并不喜欢人群中的许多人。这是因为参加会议的许多人都使用工作证明(POW)模型来挖掘加密货币。Larimer不知道,这个宣言播下了一场无声战争的种子;想要确保POW系统继续蓬勃发展的人与那些想要从过时的系统转向优秀系统的人之间的战争。 可以这么说,这是个根本问题,社区必须选择——他们选择POW还是DPOS?加密社区所做出的选择是由恐惧和贪婪驱动的,但随着即将发布的EOS,社区又有了一次机会,而这次他们的选择将再次塑造世界对区块链技术的看法。 概述 如果加密社区要大家做出选择,首先大家就必须了解被要求做出选择是怎么样的。目前业界有三个重要的模型需要考虑:工作量证明,股权证明和委托权益证明(本文不讨论其他系统)。 POW工作量证明 比特币,以太坊以及当今市场上几乎所有的加密货币都使用了工作量证明模型。POW要求计算机通过解决复杂的数学问题(即解密)来“挖掘”加密货币。对于这些采矿计算机来说解决的每个问题,他们都会获得一些加密货币。因此,加密货币就是他们...
-
下一篇
在yii2中,让你action参数支持POST数据的小方法
我们先来看一段代码 class RaController extends Controller { public $enableCsrfValidation = false; public function actionSay($username = '',$city = ''){ echo "{$username} 来自 {$city}"; } } 这里actionSay对应的url为index.php?r=ra/say,而 $username 和 $city 值的获取来自于url的参数,比如 index.php?r=ra/say&username=abei2017&city=洛阳 总结 在yii2中,action参数都是来自于GET。 但是有的时候你可能需要让action的参数来自于POST请求,怎么办? 重载runAction即可,yii2为控制器提供了runAction方法,它负责生成一个具体的Action对象并传递参数,我们可以通过复写它来实现,你可以看下yii2的生命周期来对其进行更好的了解。 那就开始干吧~,对上面的代码复写runAction class ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- MySQL数据库在高并发下的优化方案
- CentOS关闭SELinux安全模块
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用