关于缓存穿透、缓存击穿、缓存雪崩的模拟与解决(Redis)
前言
在我们日常的开发中,无不都是使用数据库来进行数据的存储,但当遇到大量数据并发请求的需求,如秒杀、热点数据请求等,若所有请求都直接打到数据库上会占用大量的硬盘资源,系统在极短的时间内完成成千上万次的读/写操作,极其容易造成数据库系统瘫痪。
此时我们会引入缓存层来阻挡大部分的请求,减轻数据库压力。但引入缓存层往往带来缓存穿透,缓存击穿,缓存雪崩等问题。
本文以Redis为例模拟且解决以上三个问题。
缓存击穿
缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,此时如果你的代码没有实现同步机制,会造成小部分的请求直接打到数据库上,给数据库带来一定的压力。
模拟需求
模拟需求:某秒杀活动即将开始,模拟1w个请求同时发生,要获取某商品的商品详情信息
期望:只能有1个请求打到数据库,其他请求均打到Redis或其他缓存中
错误示例
我们先看错误示例,以下示例代码没有做任何同步,模拟情况一。
import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * @author HeyS1 * @date 2020/3/12 * @description */ public class ConcurrentTest { //请求次数 private int reqestQty = 10000; //倒计时器,当发送reqestQty次请求后继续执行主线程 private CountDownLatch latch = new CountDownLatch(reqestQty); //记录请求落在数据库上的次数 private AtomicInteger dbSelectCount = new AtomicInteger(); //记录请求落在缓存中的次数 private AtomicInteger cacheSelectCount = new AtomicInteger(); //用HashMap模拟缓存储存 private Map<String, String> cache = new HashMap<>(); public static void main(String[] args) { new ConcurrentTest().go(); } private void go() { //同时创建1w个线程获取 for (int i = 0; i < reqestQty; i++) { new Thread(() -> { this.getGoodsDetail("商品id"); latch.countDown(); }).start(); } // 计数器大于0 时,await()方法会阻塞程序继续执行 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("数据库查询次数:" + dbSelectCount.get()); System.out.println("缓存查询次数:" + cacheSelectCount.get()); } /** * 获取商品数据 * * @param key 商品id * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key); cache.put(key, data); return data; } /** * 从缓存中获取数据 * * @param key * @return */ public String selectCache(String key) { cacheSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从cache获取数据===="); return cache.get(key); } /** * 从数据库中获取数据 * * @param key * @return */ public String selectDB(String key) { sleep(100);//模拟查询数据库花费100ms dbSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从db获取数据===="); return "数据中的数据"; } private static void sleep(long m) { try { Thread.sleep(m); } catch (InterruptedException e) { e.printStackTrace(); } } }
结果: 数据库查询次数:202 缓存查询次数:10000
可以看出,如果getGoodsDetail方法没做任何处理,还是会有少数请求直接打到数据库,这就是缓存穿透
/** * 获取商品数据 * * @param key 商品id * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key); cache.put(key, data); return data; }
解决方案1:synchronized
使用synchronized 同步代码块可解决该问题,但此方案只适合单机版架构,不适合集群或分布式架构
只需修改一下上面示例中的getGoodsDetail方法即可
/** * 获取商品数据 * * @param key 商品id * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } //同步代码块 synchronized (this) { //这里还需要再次查询缓存,防止其他等待进入同步代码块的线程的查询打到数据库上 data = this.selectCache(key); if (data != null) { return data; } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key); cache.put(key, data); return data; } }
数据库查询次数:1 缓存查询次数:10276
解决方案2:redis分布式锁
该方案适合集群或分布式架构,单机使用也可以,但没意义。
分布式锁实现方式有一般有3种,本文使用Redis来实现
1. 数据库乐观锁;
2. 基于Redis的分布式锁;
3. 基于ZooKeeper的分布式锁
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
完整示例:
这里用到RedisTemplate,请自行使用Spring集成Redis
@RunWith(SpringRunner.class) @SpringBootTest(classes = App.class) @Slf4j public class ConcurrentTest3 { @Autowired RedisTemplate<String, String> redisTemplate; //请求次数 private int reqestQty = 10000; //倒计时器,当发送reqestQty次请求后继续执行主线程 private CountDownLatch latch = new CountDownLatch(reqestQty); //记录请求落在数据库上的次数 private AtomicInteger dbSelectCount = new AtomicInteger(); //记录请求落在缓存中的次数 private AtomicInteger cacheSelectCount = new AtomicInteger(); @Test public void go() { //同时创建1w个线程获取 for (int i = 0; i < reqestQty; i++) { new Thread(() -> { this.getGoodsDetail("商品id"); latch.countDown(); }).start(); } //计数器大于0 时,await()方法会阻塞程序继续执行 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("数据库查询次数:" + dbSelectCount.get()); System.out.println("缓存查询次数:" + cacheSelectCount.get()); } public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } /** * requestId主要用来确保解锁的时候,A客户端不要把B客户端获得的锁给释放了,在本例子中其实不存在这种情况 * 看具体业务情况去取舍即可 */ String lockKey = "锁的Key"; String requestId = "请求客户端ID"; //加锁 if (!this.lock(lockKey, requestId, 10)) { //加锁失败,证明其他线程已获得了锁,此时只需等待一会,再次调用本方法即可 sleep(100); this.getGoodsDetail(key); } //加锁成功 //这里还需要再次查询缓存,防止其他等待的线程获得锁时又打到数据库上 data = this.selectCache(key); if (data != null) { return data; } //从数据库查询且将数据放入缓存 data = this.selectDB(key); redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); //释放锁 this.unLock(lockKey, requestId); return data; } /** * 使用redis特性实现互斥锁(setnx) * * @param lockKey * @param requestId * @param expireTime 锁过期时间,即超过该时间仍然未被解锁,则自动解锁,防止死锁 * @return */ public boolean lock(String lockKey, String requestId, int expireTime) { Boolean res = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, Duration.ofSeconds(expireTime)); return res != null && res; } /** * 释放锁,使用Lua脚本,确保原子性 * * @param lockKey * @param requestId * @return */ public boolean unLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class); Boolean res = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId); return res != null && res; } /** * 从缓存中获取数据 * * @param key * @return */ public String selectCache(String key) { cacheSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从cache获取数据===="); return redisTemplate.opsForValue().get(key); } /** * 从数据库中获取数据 * * @param key * @return */ public String selectDB(String key) { sleep(100);//模拟查询数据库花费100ms dbSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从db获取数据===="); return "数据中的数据"; } private static void sleep(long m) { try { Thread.sleep(m); } catch (InterruptedException e) { e.printStackTrace(); } }
结果: 数据库查询次数:1 缓存查询次数:32095
参考文章:Redis实现分布式锁的正确使用方式(java版本)
解决方案拓展
只要实现了同步机制,基本就可以从根本上解决击穿的问题,当然,我们还有一些方法可以去避免发生穿透的发生,比如
- 热点数据永不过期;
- 缓存预热,比如秒杀活动开始前,先在redis初始化数据。
- 编写脚本,去扫描即将过期但此时访问量巨大的缓存,去延迟它的过期时间。
缓存穿透
正常情况下,我们去查询数据都是存在。
那么请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。
这种查询不存在数据的现象我们称为缓存穿透。
一般情况是黑客攻击,拿着不存在的ID去发送大量的请求,这样产生的请求到数据库去查询,可能会导致你的数据库由于压力过大而宕掉。
解决方案1:缓存空值
之所以会发生穿透,就是因为缓存中没有存储这些空数据的key。从而导致每次查询都到数据库去了。
那么我们就可以为这些key对应的值设置为null或空字符串 丢到缓存里面去。后面再出现查询这个key 的请求的时候,直接返回null 。
这样,就不用在到数据库中去走一圈了,但是别忘了设置过期时间且时间不宜过长,如5分钟。
解决方案2:布隆过滤器
方案1基本能解决业务场景上的问题,但是遇到网站攻击,不停给redis请求不一样的Key,会导致redis内存爆掉。
此时就需要用到另一种方案:布隆过滤器
布隆过滤器是一个神奇的数据结构,可以用来判断一个元素是否在一个集合中
具体自行查阅网上文章
缓存雪崩
缓存雪崩其实是概念性问题,和缓存击穿相似。
缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是:缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。
只要解决了击穿和穿透的问题,就不存在雪崩了,即使某个时间点大量数据过期,也不会直接打到数据库,前提是你的Redis得扛得住。
当然,我们也得尽量避免这个问题,我们可以才用给缓存数据设定随机的过期时间来解决这个问题,避免大量缓存在同一时间过期。
其实上面“解决方案拓展”中提到方法也适用于雪崩,要根据业务灵活运用方案。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
一个优秀的Push平台,需要经历怎样的前世今生
作者:闲鱼技术-剑辛 对闲鱼的用户来说,Push更是与用户息息相关,因为闲鱼商品库存只有一件,商品的时效性很强,所以当用户关注的卖家上新、浏览的商品发生降价或者是平台为用户找到一批高性价比商品时,用户都期望尽快被通知。Push已经成为用户与闲鱼平台联系的重要纽带。 本文将以技术同学视角,介绍闲鱼Push从离线手工投放的1.0版本进化到智能个性化的2.0版本的发展过程,详细说明遇到的问题和技术方案选型,以期给读者带来一些思考和解决类似问题的思路。 一、闲鱼Push1.0 当闲鱼开始all in无线后,平台需要把与用户相关的优质内容推送给用户,便于用户快速找到想购买的商品和感兴趣的内容。平台亟需一个Push产品化方案保证将优质内容以Push的形式触达到用户,提升用户体验。基于这样的前提,闲鱼Push1.0方案的主要思路如下: 计算Push用户名
- 下一篇
恕我直言,我怀疑你没怎么用过枚举
我们是否一样? 估计很多小伙伴(也包括我自己)都有这种情况,在自学Java语言看书时,关于枚举enum这一块的知识点可能都有点 “轻敌” ,觉得这块内容非常简单,一带而过,而且在实际写代码过程中也不注意运用。 是的,我也是这样!直到有一天我提的代码审核没过,被技术总监一顿批,我才重新拿起了《Java编程思想》,把枚举这块的知识点重新又审视了一遍。 为什么需要枚举 常量定义它不香吗?为啥非得用枚举? 举个栗子,就以B站上传视频为例,视频一般有三个状态:草稿、审核和发布,我们可以将其定义为静态常量: public class VideoStatus { public static final int Draft = 1; //草稿 public static final int Review = 2; //审核 public static final int Published = 3; //发布 } 对于这种单值类型的静态常量定义,本身也没错,主要是在使用的地方没有一个明确性的约束而已,比如: void judgeVideoStatus( int status ) { ... } 比如这...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS关闭SELinux安全模块
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS8编译安装MySQL8.0.19