【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版
前言
上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁中,我们通过利用ZooKeeper的临时节点特性,实现了一个分布式锁。
但是是通过轮询的方式去判断不断尝试获取锁,空转对于CPU还是有一定消耗的,同时,对于多个线程竞争锁激烈的时候,很容易出现羊群效应。
为了解决上面两个问题。本文来看一下如何实现一个升级版的分布式锁。
设计
我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。
DistributedFairLock
public class DistributedFairLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class); //ZooKeeper客户端,进行ZooKeeper操作 private ZooKeeper zooKeeper; //根节点名称 private String dir; //加锁节点 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; //要加锁节点 private String fullPath; //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。 private volatile int state; //当前锁创建的节点id private String id; //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作 private CountDownLatch countDownLatch; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(this.node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedFairLock#init] error : " + e.toString(), e); } } }
lock
public void lock() { try { //加锁 synchronized (this) { //如果当前未持有锁 if (state <= 0) { //创建节点 if (id == null) { id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL); } //获取当前路径下所有的节点 List<String> nodes = zooKeeper.getChildren(dir, false); SortedSet<String> sortedSet = new TreeSet<>(); for (String node : nodes) { sortedSet.add(dir.concat("/").concat(node)); } //获取所有id小于当前节点顺序的节点 SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id); if (!lessSet.isEmpty()) { //监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的 Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher()); if (stat != null) { countDownLatch = new CountDownLatch(1); countDownLatch.await(); } } } state++; } } catch (InterruptedException e) { logger.error("[DistributedFairLock#lock] error : " + e.toString(), e); Thread.currentThread().interrupt(); } catch (KeeperException ke) { logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } }
tryLock
public boolean tryLock() { try { synchronized (this) { if (state <= 0) { if (id == null) { id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL); } List<String> nodes = zooKeeper.getChildren(dir, false); SortedSet<String> sortedSet = new TreeSet<>(); for (String node : nodes) { sortedSet.add(dir.concat("/").concat(node)); } SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id); if (!lessSet.isEmpty()) { return false; } } state++; } } catch (InterruptedException e) { logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e); return false; } catch (KeeperException ke) { logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } return true; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { try { synchronized (this) { if (state <= 0) { if (id == null) { id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL); } List<String> nodes = zooKeeper.getChildren(dir, false); SortedSet<String> sortedSet = new TreeSet<>(); for (String node : nodes) { sortedSet.add(dir.concat("/").concat(node)); } SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id); if (!lessSet.isEmpty()) { Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher()); if (stat != null) { countDownLatch = new CountDownLatch(1); countDownLatch.await(time, unit); } } } state++; } } catch (InterruptedException e) { logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e); return false; } catch (KeeperException ke) { logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } return true; }
unlock
public void unlock() { synchronized (this) { if (state > 0) { state--; } //当不再持有锁时,删除创建的临时节点 if (state == 0 && zooKeeper != null) { try { zooKeeper.delete(id, -1); id = null; } catch (Exception e) { logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e); } } } }
LockWatcher
private class LockWatcher implements Watcher { @Override public void process(WatchedEvent event) { synchronized (this) { if (countDownLatch != null) { countDownLatch.countDown(); } } } }
总结
上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。
如果对以上方式有更好的优化方案,欢迎一起讨论。
更多文章
见我的博客:https://nc2era.com
written by AloofJr,转载请注明出处
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Python 周刊第 418 期
新闻 PyCon US 2020 开始接受财务赞助! https://pycon.blogspot.com/2019/10/financial-aid-launches-for-pycon-us-2020.html2020年 Python 美国开发者大会,tips: 中国也有,可以赞助国内的) 已开放财务赞助申请,Pycon 将在 2020 年 1 月 31 日之前接受财务赞助。Python 软件基金会和 PyLadies 会对该大会进行财务赞助。今年,Python 软件基金会将提供 130000 美元的财务赞助,而 PyLadies 会根据 2019 年全年的捐款情况尽其所能赞助 Pycon。 VS Code 支持 Jupyter Notebook 的编辑 https://devblogs.microsoft.com/python/announcing-support-for-native-editing-of-jupyter-notebooks-in-vs-code/ 文章、教程或研讨会 用 Python 从 Last.fm API 接口获取音乐数据 https://www.dat...
- 下一篇
12月09日云栖号头条:人脸识别遍地开花,谁来守护我们的脸;人工智还能预测地震?
今日最新云头条快讯:2019年被视为5G商用元年;地震预测的目标从来都不是预测慢地震。相反,它是为了预测突然发生的、对生命和肢体构成危险的灾难性地震;,一起来看最新的资讯: 地又震了,人工智还能预测地震? 对于机器学习的方法来说,这提出了一个矛盾:最大的地震,地震学家最希望能够预测的地震,同时也是最罕见的。机器学习算法如何获得足够的训练数据来预测它们?在最好的情况下,对大地震的预测可能会有数周、数月或数年的时间限制。虽然预测可能无法用于协调地震前夕的大规模疏散,但可以增加公众的准备期,帮助政府官员有针对性地改造不安全的建筑,并以其他方式减轻灾难性地震的危害。 从“芯”看5G 5G时代,除了考验芯片设计能力,生态系统建设与运营也将成为移动终端芯片公司所面临的重要挑战。全球超过40家运营商已经局部部署了5G网络,超过40家终端厂商宣布推出5G终端。但如何让5G从地图上的一个个点缀,规模化扩展至全球,是当前移动通信产业的头等大事。 中美日韩处于5G商用进度第一梯队 根据美国无线通信和互联网协会(CTIA)2018年4月发布的报告,中国、韩国、美国、日本已成为全球5G的主要玩家,其中中国的5G...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果