您现在的位置是:首页 > 文章详情

Redission分布式锁源码解析

日期:2018-11-29点击:465

Redission锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redission中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。


锁的接口定义了一下方法:

images/G2J2Bx6f7cRsNZQ7S375Sz26biZzW6CE.jpg


分布式锁当中加锁,我们常用的加锁接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我们来看一下方法的具体实现:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {         long time = unit.toMillis(waitTime);         long current = System.currentTimeMillis();         final long threadId = Thread.currentThread().getId();         Long ttl = tryAcquire(leaseTime, unit, threadId);         // lock acquired         if (ttl == null) {             return true;         }                  time -= (System.currentTimeMillis() - current);         if (time <= 0) {             acquireFailed(threadId);             return false;         }                  current = System.currentTimeMillis();         final RFuture subscribeFuture = subscribe(threadId);         if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {             if (!subscribeFuture.cancel(false)) {                 subscribeFuture.addListener(new FutureListener() {                     @Override                     public void operationComplete(Future future) throws Exception {                         if (subscribeFuture.isSuccess()) {                             unsubscribe(subscribeFuture, threadId);                         }                     }                 });             }             acquireFailed(threadId);             return false;         }         try {             time -= (System.currentTimeMillis() - current);             if (time <= 0) {                 acquireFailed(threadId);                 return false;             }                      while (true) {                 long currentTime = System.currentTimeMillis();                 ttl = tryAcquire(leaseTime, unit, threadId);                 // lock acquired                 if (ttl == null) {                     return true;                 }                 time -= (System.currentTimeMillis() - currentTime);                 if (time = 0 && ttl < time) {                     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                 } else {                     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);                 }                 time -= (System.currentTimeMillis() - currentTime);                 if (time <= 0) {                     acquireFailed(threadId);                     return false;                 }             }         } finally {             unsubscribe(subscribeFuture, threadId);         } //        return get(tryLockAsync(waitTime, leaseTime, unit));     }

首先我们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间TTL来判定的(TTL


下面我们接着看一下tryAcquire的实现:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {     return get(tryAcquireAsync(leaseTime, unit, threadId)); }

可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。

get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor

设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。

public  V get(RFuture future) {     if (!future.isDone()) {         final CountDownLatch l = new CountDownLatch(1);         future.addListener(new FutureListener() {             @Override             public void operationComplete(Future future) throws Exception {                 l.countDown();             }         });                  boolean interrupted = false;         while (!future.isDone()) {             try {                 l.await();             } catch (InterruptedException e) {                 interrupted = true;             }         }                  if (interrupted) {             Thread.currentThread().interrupt();         }     }     // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒     // future.awaitUninterruptibly();     if (future.isSuccess()) {         return future.getNow();     }     throw convertException(future); }


我们进一步往下看:

private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {     if (leaseTime != -1) {         return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);     }     RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);     ttlRemainingFuture.addListener(new FutureListener() {         @Override         public void operationComplete(Future future) throws Exception {             if (!future.isSuccess()) {                 return;             }             Long ttlRemaining = future.getNow();             // lock acquired             if (ttlRemaining == null) {                 scheduleExpirationRenewal(threadId);             }         }     });     return ttlRemainingFuture; }

首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的

private long lockWatchdogTimeout = 30 * 1000;

下面我们在进一步往下分析真正获取锁的操作:

 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {     internalLockLeaseTime = unit.toMillis(leaseTime);     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,               "if (redis.call('exists', KEYS[1]) == 0) then " +                   "redis.call('hset', KEYS[1], ARGV[2], 1); " +                   "redis.call('pexpire', KEYS[1], ARGV[1]); " +                   "return nil; " +               "end; " +               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                   "redis.call('pexpire', KEYS[1], ARGV[1]); " +                   "return nil; " +               "end; " +               "return redis.call('pttl', KEYS[1]);",                 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

我把里面的重点信息做了以下三点总结:


1:真正执行的是一段具有原子性的Lua脚本,并且最终也是由CommandAsynExecutor去执行。


2:锁真正持久化到Redis时,用的hash类型key field value


3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过期时间;getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId。有的同学可能会问,这样不是很缜密:不同的JVM可能会生成相同的threadId,所以Redission这里加了一个区分度很高的UUID;


Lua脚本中的执行分为以下三步:


1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;


2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间


3:key不存,直接返回key的剩余过期时间(-2)


相关推荐:https://www.roncoo.com/course/list.html?courseName=redis



原文链接:https://blog.roncoo.com/article/133572
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章