您现在的位置是:首页 > 文章详情

关于缓存穿透、缓存击穿、缓存雪崩的模拟与解决(Redis)

日期:2020-03-13点击:443

前言

在我们日常的开发中,无不都是使用数据库来进行数据的存储,但当遇到大量数据并发请求的需求,如秒杀、热点数据请求等,若所有请求都直接打到数据库上会占用大量的硬盘资源,系统在极短的时间内完成成千上万次的读/写操作,极其容易造成数据库系统瘫痪。

此时我们会引入缓存层来阻挡大部分的请求,减轻数据库压力。但引入缓存层往往带来缓存穿透,缓存击穿,缓存雪崩等问题。

本文以Redis为例模拟且解决以上三个问题。

缓存击穿

缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,此时如果你的代码没有实现同步机制,会造成小部分的请求直接打到数据库上,给数据库带来一定的压力。

模拟需求

模拟需求:某秒杀活动即将开始,模拟1w个请求同时发生,要获取某商品的商品详情信息

期望:只能有1个请求打到数据库,其他请求均打到Redis或其他缓存中

错误示例

我们先看错误示例,以下示例代码没有做任何同步,模拟情况一。

 import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * @author HeyS1 * @date 2020/3/12 * @description */ public class ConcurrentTest { //请求次数 private int reqestQty = 10000; //倒计时器,当发送reqestQty次请求后继续执行主线程 private CountDownLatch latch = new CountDownLatch(reqestQty); //记录请求落在数据库上的次数 private AtomicInteger dbSelectCount = new AtomicInteger(); //记录请求落在缓存中的次数 private AtomicInteger cacheSelectCount = new AtomicInteger(); //用HashMap模拟缓存储存 private Map<String, String> cache = new HashMap<>(); public static void main(String[] args) { new ConcurrentTest().go(); } private void go() { //同时创建1w个线程获取 for (int i = 0; i < reqestQty; i++) { new Thread(() -> { this.getGoodsDetail("商品id"); latch.countDown(); }).start(); } // 计数器大于0 时,await()方法会阻塞程序继续执行 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("数据库查询次数:" + dbSelectCount.get()); System.out.println("缓存查询次数:" + cacheSelectCount.get()); } /** * 获取商品数据 * * @param key 商品id * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key); cache.put(key, data); return data; } /** * 从缓存中获取数据 * * @param key * @return */ public String selectCache(String key) { cacheSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从cache获取数据===="); return cache.get(key); } /** * 从数据库中获取数据 * * @param key * @return */ public String selectDB(String key) { sleep(100);//模拟查询数据库花费100ms dbSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从db获取数据===="); return "数据中的数据"; } private static void sleep(long m) { try { Thread.sleep(m); } catch (InterruptedException e) { e.printStackTrace(); } } }

 

结果: 数据库查询次数:202 缓存查询次数:10000

可以看出,如果getGoodsDetail方法没做任何处理,还是会有少数请求直接打到数据库,这就是缓存穿透

 /** * 获取商品数据 * * @param key 商品id * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key); cache.put(key, data); return data; }

 

解决方案1:synchronized 

使用synchronized 同步代码块可解决该问题,但此方案只适合单机版架构,不适合集群或分布式架构

只需修改一下上面示例中的getGoodsDetail方法即可

 /** * 获取商品数据 * * @param key 商品id * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } //同步代码块 synchronized (this) { //这里还需要再次查询缓存,防止其他等待进入同步代码块的线程的查询打到数据库上 data = this.selectCache(key); if (data != null) { return data; } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key); cache.put(key, data); return data; } } 
数据库查询次数:1 缓存查询次数:10276

解决方案2:redis分布式锁

该方案适合集群或分布式架构,单机使用也可以,但没意义。

分布式锁实现方式有一般有3种,本文使用Redis来实现

1. 数据库乐观锁;
2. 基于Redis的分布式锁;
3. 基于ZooKeeper的分布式锁

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。 
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。 
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

 

完整示例:

这里用到RedisTemplate,请自行使用Spring集成Redis

 @RunWith(SpringRunner.class) @SpringBootTest(classes = App.class) @Slf4j public class ConcurrentTest3 { @Autowired RedisTemplate<String, String> redisTemplate; //请求次数 private int reqestQty = 10000; //倒计时器,当发送reqestQty次请求后继续执行主线程 private CountDownLatch latch = new CountDownLatch(reqestQty); //记录请求落在数据库上的次数 private AtomicInteger dbSelectCount = new AtomicInteger(); //记录请求落在缓存中的次数 private AtomicInteger cacheSelectCount = new AtomicInteger(); @Test public void go() { //同时创建1w个线程获取 for (int i = 0; i < reqestQty; i++) { new Thread(() -> { this.getGoodsDetail("商品id"); latch.countDown(); }).start(); } //计数器大于0 时,await()方法会阻塞程序继续执行 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("数据库查询次数:" + dbSelectCount.get()); System.out.println("缓存查询次数:" + cacheSelectCount.get()); } public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data; } /** * requestId主要用来确保解锁的时候,A客户端不要把B客户端获得的锁给释放了,在本例子中其实不存在这种情况 * 看具体业务情况去取舍即可 */ String lockKey = "锁的Key"; String requestId = "请求客户端ID"; //加锁 if (!this.lock(lockKey, requestId, 10)) { //加锁失败,证明其他线程已获得了锁,此时只需等待一会,再次调用本方法即可 sleep(100); this.getGoodsDetail(key); } //加锁成功 //这里还需要再次查询缓存,防止其他等待的线程获得锁时又打到数据库上 data = this.selectCache(key); if (data != null) { return data; } //从数据库查询且将数据放入缓存 data = this.selectDB(key); redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); //释放锁 this.unLock(lockKey, requestId); return data; } /** * 使用redis特性实现互斥锁(setnx) * * @param lockKey * @param requestId * @param expireTime 锁过期时间,即超过该时间仍然未被解锁,则自动解锁,防止死锁 * @return */ public boolean lock(String lockKey, String requestId, int expireTime) { Boolean res = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, Duration.ofSeconds(expireTime)); return res != null && res; } /** * 释放锁,使用Lua脚本,确保原子性 * * @param lockKey * @param requestId * @return */ public boolean unLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class); Boolean res = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId); return res != null && res; } /** * 从缓存中获取数据 * * @param key * @return */ public String selectCache(String key) { cacheSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从cache获取数据===="); return redisTemplate.opsForValue().get(key); } /** * 从数据库中获取数据 * * @param key * @return */ public String selectDB(String key) { sleep(100);//模拟查询数据库花费100ms dbSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从db获取数据===="); return "数据中的数据"; } private static void sleep(long m) { try { Thread.sleep(m); } catch (InterruptedException e) { e.printStackTrace(); } }
结果: 数据库查询次数:1 缓存查询次数:32095

参考文章:Redis实现分布式锁的正确使用方式(java版本)

 

解决方案拓展

只要实现了同步机制,基本就可以从根本上解决击穿的问题,当然,我们还有一些方法可以去避免发生穿透的发生,比如

  1. 热点数据永不过期;
  2. 缓存预热,比如秒杀活动开始前,先在redis初始化数据。
  3. 编写脚本,去扫描即将过期但此时访问量巨大的缓存,去延迟它的过期时间。

 

缓存穿透

正常情况下,我们去查询数据都是存在。

那么请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。

这种查询不存在数据的现象我们称为缓存穿透。

一般情况是黑客攻击,拿着不存在的ID去发送大量的请求,这样产生的请求到数据库去查询,可能会导致你的数据库由于压力过大而宕掉。

解决方案1:缓存空值

之所以会发生穿透,就是因为缓存中没有存储这些空数据的key。从而导致每次查询都到数据库去了。

那么我们就可以为这些key对应的值设置为null或空字符串 丢到缓存里面去。后面再出现查询这个key 的请求的时候,直接返回null 。

这样,就不用在到数据库中去走一圈了,但是别忘了设置过期时间且时间不宜过长,如5分钟。

解决方案2:布隆过滤器

方案1基本能解决业务场景上的问题,但是遇到网站攻击,不停给redis请求不一样的Key,会导致redis内存爆掉。

此时就需要用到另一种方案:布隆过滤器

布隆过滤器是一个神奇的数据结构,可以用来判断一个元素是否在一个集合中

具体自行查阅网上文章

 

缓存雪崩

缓存雪崩其实是概念性问题,和缓存击穿相似。

缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是:缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

只要解决了击穿和穿透的问题,就不存在雪崩了,即使某个时间点大量数据过期,也不会直接打到数据库,前提是你的Redis得扛得住。

当然,我们也得尽量避免这个问题,我们可以才用给缓存数据设定随机的过期时间来解决这个问题,避免大量缓存在同一时间过期。

其实上面“解决方案拓展”中提到方法也适用于雪崩,要根据业务灵活运用方案。

 

 

 

原文链接:https://my.oschina.net/yejunxi/blog/3193509
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章