您现在的位置是:首页 > 文章详情

【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版

日期:2019-12-07点击:328

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁中,我们通过利用ZooKeeper的临时节点特性,实现了一个分布式锁。
但是是通过轮询的方式去判断不断尝试获取锁,空转对于CPU还是有一定消耗的,同时,对于多个线程竞争锁激烈的时候,很容易出现羊群效应。

为了解决上面两个问题。本文来看一下如何实现一个升级版的分布式锁。

设计

我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。

DistributedFairLock

public class DistributedFairLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class); //ZooKeeper客户端,进行ZooKeeper操作 private ZooKeeper zooKeeper; //根节点名称 private String dir; //加锁节点 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; //要加锁节点 private String fullPath; //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。 private volatile int state; //当前锁创建的节点id private String id; //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作 private CountDownLatch countDownLatch; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(this.node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedFairLock#init] error : " + e.toString(), e); } } }

lock

public void lock() { try { //加锁 synchronized (this) { //如果当前未持有锁 if (state <= 0) { //创建节点 if (id == null) { id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL); } //获取当前路径下所有的节点 List<String> nodes = zooKeeper.getChildren(dir, false); SortedSet<String> sortedSet = new TreeSet<>(); for (String node : nodes) { sortedSet.add(dir.concat("/").concat(node)); } //获取所有id小于当前节点顺序的节点 SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id); if (!lessSet.isEmpty()) { //监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的 Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher()); if (stat != null) { countDownLatch = new CountDownLatch(1); countDownLatch.await(); } } } state++; } } catch (InterruptedException e) { logger.error("[DistributedFairLock#lock] error : " + e.toString(), e); Thread.currentThread().interrupt(); } catch (KeeperException ke) { logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } }

tryLock

public boolean tryLock() { try { synchronized (this) { if (state <= 0) { if (id == null) { id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL); } List<String> nodes = zooKeeper.getChildren(dir, false); SortedSet<String> sortedSet = new TreeSet<>(); for (String node : nodes) { sortedSet.add(dir.concat("/").concat(node)); } SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id); if (!lessSet.isEmpty()) { return false; } } state++; } } catch (InterruptedException e) { logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e); return false; } catch (KeeperException ke) { logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } return true; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { try { synchronized (this) { if (state <= 0) { if (id == null) { id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL); } List<String> nodes = zooKeeper.getChildren(dir, false); SortedSet<String> sortedSet = new TreeSet<>(); for (String node : nodes) { sortedSet.add(dir.concat("/").concat(node)); } SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id); if (!lessSet.isEmpty()) { Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher()); if (stat != null) { countDownLatch = new CountDownLatch(1); countDownLatch.await(time, unit); } } } state++; } } catch (InterruptedException e) { logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e); return false; } catch (KeeperException ke) { logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } return true; }

unlock

public void unlock() { synchronized (this) { if (state > 0) { state--; } //当不再持有锁时,删除创建的临时节点 if (state == 0 && zooKeeper != null) { try { zooKeeper.delete(id, -1); id = null; } catch (Exception e) { logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e); } } } }

LockWatcher

private class LockWatcher implements Watcher { @Override public void process(WatchedEvent event) { synchronized (this) { if (countDownLatch != null) { countDownLatch.countDown(); } } } }

总结

上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。

如果对以上方式有更好的优化方案,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

原文链接:https://yq.aliyun.com/articles/738391
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章