Redission分布式锁源码解析
Redission锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redission中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。
锁的接口定义了一下方法:
分布式锁当中加锁,我们常用的加锁接口:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面我们来看一下方法的具体实现:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); final RFuture subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time = 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
首先我们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间TTL来判定的(TTL
下面我们接着看一下tryAcquire的实现:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。
get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor
设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。
public V get(RFuture future) { if (!future.isDone()) { final CountDownLatch l = new CountDownLatch(1); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { l.countDown(); } }); boolean interrupted = false; while (!future.isDone()) { try { l.await(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒 // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); }
我们进一步往下看:
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的
private long lockWatchdogTimeout = 30 * 1000;
下面我们在进一步往下分析真正获取锁的操作:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我把里面的重点信息做了以下三点总结:
1:真正执行的是一段具有原子性的Lua脚本,并且最终也是由CommandAsynExecutor去执行。
2:锁真正持久化到Redis时,用的hash类型key field value
3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过期时间;getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId。有的同学可能会问,这样不是很缜密:不同的JVM可能会生成相同的threadId,所以Redission这里加了一个区分度很高的UUID;
Lua脚本中的执行分为以下三步:
1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;
2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间
3:key不存,直接返回key的剩余过期时间(-2)
相关推荐:https://www.roncoo.com/course/list.html?courseName=redis

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
微服务Api网关框架总结
课程介绍 最近微服务架构在项目中的应用越来越多,我们知道在微服务架构风格中,一个大应用被拆分成为了多个小的服务系统提供出来,这些小的系统他们可以自成体系,也就是说这些小系统可以拥有自己的数据库,框架甚至语言等,这些小系统通常以提供 Rest Api 风格的接口来被 H5, Android, IOS 以及第三方应用程序调用。 但是在UI上进行展示的时候,我们通常需要在一个界面上展示很多数据,这些数据可能来自于不同的微服务中,举个例子。 在一个电商系统中,查看一个商品详情页,这个商品详情页包含商品的标题,价格,库存,评论等,这些数据对于后端来说可能是位于不同的微服务系统之中,可能我后台的系统是这样来拆分我的服务的: 产品服务 - 负责提供商品的标题,描述,规格等。 价格服务 - 负责对产品进行定价,价格策略计算,促销价等。 库存服务 - 负责产品库存。 评价服务 - 负责用户对商品的评论,回复等。 现在,商品详情页需要从这些微服务中拉取相应的信息,问题来了? 问题 由于我们使用的服务系统架构,所以没办法像传统单体应用一样依靠数据库的 join 查询来得到最终结果,那么如何才能访问各个服务呢...
- 下一篇
spring boot 源码解析2-SpringApplication初始化
前⾔ 我们⽣成⼀个spring boot 项⽬时,会⾃带⼀个启动类. 代码如下: @SpringBootApplication publicclassSpringBootAnalysisApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(SpringBootAnalysisApplication.class,args); } } 就是这么简单的代码,构成了spring boot的世界. 那么代码中只有⼀个@SpringBootApplication 注解 和 调⽤了SpringApplication#run ⽅法.那么我们先来解析SpringApplication的run⽅法. 解析 ⾸先调⽤了org.springframework.boot.SpringApplication#run(Object, String...) ⽅法.代码如下: publicstaticConfigurableApplicationContextrun(Objectsource,String...args){ retu...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- Hadoop3单机部署,实现最简伪集群
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS关闭SELinux安全模块
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库