R2M分布式锁原理及实践
作者:京东科技 张石磊
1 案例引入
名词简介:
资源:可以理解为一条内容,或者图+文字+链接的载体。
档位ID: 资源的分类组,资源必须归属于档位。
问题描述:当同一个档位下2条资源同时审批通过时,收到擎天审批系统2条消息,消费者应用部署了2台机器,此时正好由2台机器分别消费,在并发消费时,先更新资源状态,然后写缓存,每次取前100条资源,类似select * from resource where gear_id=xxx limit 100 order by id desc;
在写档位缓存,此时事务未提交,并发查询时根据档位Id查询时查询不到对方的数据,全量写缓存时导致后写的缓存覆盖了先写的缓存,即缓存被覆盖,导致投放资源缺失。
方案思考 :
方案1:一台机器消费mq–单点问题
方案2:将同档位ID的资源路由到同一个queue,需要审批系统配合根据档位Id做路由,审批系统发的消息不只是cms审批数据,此方案不适用。
方案3:在档位级别加分布式锁。
经比较,最终采用方案3是合适的方案.
2 锁说明和分布式锁选择
synchronized锁的粒度是JVM进程维度,集群模式下,不能对共享资源加锁,此时需要跨JVM进程的分布式锁。
分布式锁方式核心实现方式优点缺点分析
1 数据库:
悲观锁,lock
乐观锁,通过版本号实现version
实现简单,不依赖中间件
数据库IO瓶颈,性能差
单实例存在单点问题,主从架构存在数据不一致,主从切换时其他客户端可重复加锁。
2 zookeeper
创建临时节点
CP模型,可靠性高,不存在主从切换不一致问题
频繁创建和销毁临时节点,且
集群模式下,leader数据需要同步到follower才算加锁成功,性能不如redis
主从切换服务不可用
3 redis集群
setnx+expire
性能高
有封装好的框架redission
支持超时自动删除锁
集群支持高可用,AP模型
主从切换时其他客户端可重复加锁。
R2M是基于开源的Redis cluster(Redis 3.0以上版本)构建的高性能分布式缓存系统,我们系统一直在使用,3.2.5版本开始支持分布式锁。
3 r2m分布式锁原理
示例代码:
String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId()); R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey); //获取锁,锁的默认有效期30s,获取不到锁就阻塞 lock.lock(); try { //业务代码 resourceService.afterApprovedHandle(resource); } finally { //释放锁 lock.unlock(); }
1 加锁核心流程:
加锁流程图:
1):尝试获取锁,通过执行加锁Lua脚本来做;
2):若第一步未获取到锁,则去redis订阅解锁消息
3):一旦持有锁的线程释放了锁,就会广播解锁消息,其他线程自旋重新尝试获取锁。
核心加锁原理:使用lua脚本封装了hset和pexpire命令,保证是一个原子操作, KEYS[1]是加锁的key,argv[2]是加锁的客户端ID(UUID+线程ID),ARGV[1]是锁的有效期,默认30s.
private Object acquireInternal(List<String> args) { if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) { return -1L; } else { try { //hash结构,hash的key是加锁的key,键值对的key为客户端的UUID+线程id,value为锁的重入计数器值。 return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(), "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args); } catch (Exception var3) { this.setUnlocked(); throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3); } } }
args参数
private List<String> acquireArgs(long leaseTime) { List<String> args = new ArrayList(); if (leaseTime != -1L) { args.add(String.valueOf(leaseTime)); } else { //默认30s args.add(String.valueOf(this.internalLockLeaseTime())); } //UUID+当前线程id args.add(this.currentThreadLockId(Thread.currentThread().getId())); return args; }
获取锁失败订阅锁的channel
//获取锁失败,订阅释放锁的消息 private boolean failedAcquire() { this.subLock(); return false; } private void subLock() { CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this); if (cf != null) { cf.handleAsync(this::reSubIfEx); } }
锁释放后,订阅者通过自旋尝试获取锁。
//tryAcquire获取锁,!tryAcquire就是获取锁失败,锁释放后,通知线程唤醒后返回false,然后通过自旋,尝试获取锁, public final void acquire(long arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, long arg) { boolean failed = true; try { boolean interrupted = false; //内部自旋获取锁 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
2 释放锁核心逻辑:
1)删除分布式锁key(如果可重入锁计数为0)
- 发释放锁的广播消息
3)取消watchdog
private Object unlockInternal(List<String> args) { logger.debug("{} trying to unlock.", Thread.currentThread().getId()); Object var2; try { //判断锁 key 是否存在,如果存在,然后递减hash的value值,当value值为0时再删除锁key,并且广播释放锁的消息 if (this.unlockSha() == null) { var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args); return var2; } var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args); } catch (Exception var6) { throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6); } finally { this.finalizeRelease(); } return var2; } //取消当前线程的watchdog private void finalizeRelease() { long threadId = Thread.currentThread().getId(); R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId); if (entry != null) { entry.release(threadId); if (entry.isReleased()) { //取消这个线程watchdog定时任务 entry.getExtTask().cancel(); this.expEntry.compareAndSet(entry, (Object)null); //从缓存watchdog线程的map中删除该线程 this.entryCache.remove(threadId); } } }
3 锁的健壮性思考
1 业务没执行完,锁超时过期怎么办?
客户端加锁默认有效期30s,超过有效期后如果业务没执行完,还需要持有这把锁,r2m客户端提供了续期机制,也就是watchdog机制。
watchdog原理:客户端线程维度(UUID+线程ID,客户端维护一个MAP,key就是UUID+线程ID)的后台定时线程,获取锁成功后,如果客户端还持有当前锁,每隔10s(this.internalLockLeaseTime() / 3L),去延长锁的有效期(internalLockLeaseTime)
//watchdog核心机制 ,internalLockLeaseTime默认30s private void extendLock(long holderId) { if (this.expEntry.get() != null) { R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId); if (holderEntry != null) { //每隔10s,如果当前线程持有锁,则续期30s if (this.expEntry.compareAndSet(holderEntry, holderEntry)) { Timeout task = this.timer().newTimeout((timeout) -> { if (this.extendLockInternal(holderId)) { this.extendLock(holderId); } }, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS); if (this.expEntry.get() != null) { ((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task); } } } } } //执行续期lua脚本 private boolean extendLockInternal(long threadId) { Object result; try { //只续期 if (this.extendLockSha() != null) { result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId)); } else { result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId)); } } catch (Exception var5) { return false; } return Long.parseLong(result.toString()) == 1L; }
2 客户端宕机,锁如何释放?
分布式锁是有效期的,客户端宕机后,watchdog机制失效,锁过期自动失效。
3 redis分布式锁集群模式下缺陷
r2m集群模式,极端情况,master加锁成功,宕机,还未来得及同步到slave,主从切换,slave切换成master,可以继续加锁,对于非及其严格加锁场景,该方案可满足,属于AP;对于严格场景下的分布式锁,可采用基于zookeeper的分布式锁,属于CP,leader宕机,folllower选举时不可用。性能上redis更优。
4 锁的释放问题
注意锁的释放在finally中释放,必须由锁的持有者释放,不能由其他线程释放别人的锁,示例代码中lock放到try的外面。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
一文搞懂Redis
一、Redis的特性 1.1 Redis为什么快? 基于内存操作,操作不需要跟磁盘交互,单次执行很快 命令执行是单线程,因为是基于内存操作,单次执行的时间快于线程切换时间,同时通信采用多路复用 Redis本身就是一个k-v结构,类似于hashMap,所以查询性能接近于O(1) 同时redis自己底层数据结构支持,比如跳表、SDS lO多路复用,单个线程中通过记录跟踪每一个sock(I/O流)的状态来管理多个I/O流 1.2 Redis其他特性 更丰富的数据类型,虽然都是k、v结构,value可以存储很多的数据类型 完善的内存管理机制、保证数据一致性:持久化机制、过期策略 支持多种编程语言 高可用,集群、保证高可用 1.3 Redis高可用 很完善的内存管理机制,过期、淘汰、持久化 集群模式,主从、哨兵、cluster集群 二、Redis数据类型以及使用场景 Redis的数据类型有String、Hash、Set、List、Zset、bitMap(基于String类型)、 Hyperloglog(基于String类型)、Geo(地理位置)、Streams流。 2.1 String 2.1....
- 下一篇
积木 v2.3.1 已经发布,Java 后台管理系统
积木 v2.3.1 已经发布,Java 后台管理系统 此版本更新内容包括: 1、支持免tomcat运行 2、支持打包docker 3、集成satoken,同时支持多管理平台集成 4、配置信息全db提取(除了db连接,配置表:bb_sys_params) 5、本环境为安装环境,启动后续配置db连接 运行 1、基于setup环境打包 2、bb-project-release 路径下,jfinal.bat start 或者 jfinal.sh start 推荐先自行建库 详情查看:https://gitee.com/RemoteControl/BB/releases/v2.3.1
相关文章
文章评论
共有0条评论来说两句吧...