【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版
前言
上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁中,我们通过利用ZooKeeper的临时节点特性,实现了一个分布式锁。
但是是通过轮询的方式去判断不断尝试获取锁,空转对于CPU还是有一定消耗的,同时,对于多个线程竞争锁激烈的时候,很容易出现羊群效应。
为了解决上面两个问题。本文来看一下如何实现一个升级版的分布式锁。
设计
我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。
DistributedFairLock
public class DistributedFairLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);
//ZooKeeper客户端,进行ZooKeeper操作
private ZooKeeper zooKeeper;
//根节点名称
private String dir;
//加锁节点
private String node;
//ZooKeeper鉴权信息
private List<ACL> acls;
//要加锁节点
private String fullPath;
//加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
private volatile int state;
//当前锁创建的节点id
private String id;
//通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
private CountDownLatch countDownLatch;
/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
this.fullPath = dir.concat("/").concat(this.node);
init();
}
private void init() {
try {
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
}
}
}
lock
public void lock() {
try {
//加锁
synchronized (this) {
//如果当前未持有锁
if (state <= 0) {
//创建节点
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
//获取当前路径下所有的节点
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
//获取所有id小于当前节点顺序的节点
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
//监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的
Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
if (stat != null) {
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
Thread.currentThread().interrupt();
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
Thread.currentThread().interrupt();
}
}
}
tryLock
public boolean tryLock() {
try {
synchronized (this) {
if (state <= 0) {
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
return false;
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
return false;
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
return true;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
try {
synchronized (this) {
if (state <= 0) {
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
if (stat != null) {
countDownLatch = new CountDownLatch(1);
countDownLatch.await(time, unit);
}
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
return false;
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
return true;
}
unlock
public void unlock() {
synchronized (this) {
if (state > 0) {
state--;
}
//当不再持有锁时,删除创建的临时节点
if (state == 0 && zooKeeper != null) {
try {
zooKeeper.delete(id, -1);
id = null;
} catch (Exception e) {
logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
}
}
}
}
LockWatcher
private class LockWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
synchronized (this) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
}
总结
上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。
如果对以上方式有更好的优化方案,欢迎一起讨论。
更多文章
见我的博客:https://nc2era.com
written by AloofJr,转载请注明出处

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Python 周刊第 418 期
新闻 PyCon US 2020 开始接受财务赞助! https://pycon.blogspot.com/2019/10/financial-aid-launches-for-pycon-us-2020.html2020年 Python 美国开发者大会,tips: 中国也有,可以赞助国内的) 已开放财务赞助申请,Pycon 将在 2020 年 1 月 31 日之前接受财务赞助。Python 软件基金会和 PyLadies 会对该大会进行财务赞助。今年,Python 软件基金会将提供 130000 美元的财务赞助,而 PyLadies 会根据 2019 年全年的捐款情况尽其所能赞助 Pycon。 VS Code 支持 Jupyter Notebook 的编辑 https://devblogs.microsoft.com/python/announcing-support-for-native-editing-of-jupyter-notebooks-in-vs-code/ 文章、教程或研讨会 用 Python 从 Last.fm API 接口获取音乐数据 https://www.dat...
-
下一篇
12月09日云栖号头条:人脸识别遍地开花,谁来守护我们的脸;人工智还能预测地震?
今日最新云头条快讯:2019年被视为5G商用元年;地震预测的目标从来都不是预测慢地震。相反,它是为了预测突然发生的、对生命和肢体构成危险的灾难性地震;,一起来看最新的资讯: 地又震了,人工智还能预测地震? 对于机器学习的方法来说,这提出了一个矛盾:最大的地震,地震学家最希望能够预测的地震,同时也是最罕见的。机器学习算法如何获得足够的训练数据来预测它们?在最好的情况下,对大地震的预测可能会有数周、数月或数年的时间限制。虽然预测可能无法用于协调地震前夕的大规模疏散,但可以增加公众的准备期,帮助政府官员有针对性地改造不安全的建筑,并以其他方式减轻灾难性地震的危害。 从“芯”看5G 5G时代,除了考验芯片设计能力,生态系统建设与运营也将成为移动终端芯片公司所面临的重要挑战。全球超过40家运营商已经局部部署了5G网络,超过40家终端厂商宣布推出5G终端。但如何让5G从地图上的一个个点缀,规模化扩展至全球,是当前移动通信产业的头等大事。 中美日韩处于5G商用进度第一梯队 根据美国无线通信和互联网协会(CTIA)2018年4月发布的报告,中国、韩国、美国、日本已成为全球5G的主要玩家,其中中国的5G...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2整合Redis,开启缓存,提高访问速度
- MySQL数据库在高并发下的优化方案
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- CentOS7,8上快速安装Gitea,搭建Git服务器