您现在的位置是:首页 > 文章详情

【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式竞选

日期:2020-01-26点击:380

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版中,我们通过利用ZooKeeper的临时节点和Watcher特性,实现了一个分布式锁。
本文我们结合实际场景,完成一个分布式竞争选举。

设计

这里我们实现一个公平的选举方式,即先参加选举的优先被选为leader。
具体的实现思路 参考了ZooKeeper提供的官方示例:zookeeper-recipes-election

  • START:服务器开始竞选
  • OFFER:创建临时顺序结点
  • DETERMINE:开始决策,将临时节点按末尾序号从小到大排序,如果当前节点的序号最小,则竞选成功,否则,则Watch前一个节点,当前一个节点被删除时,再次进行决策
  • ELECTED:当前节点是序号最小的节点,竞选成功
  • READY:当前节点不是序号最小的节点,竞选不成功,Watch前一个节点,进入READY态
  • FAILED:当出现异常情况时,为失败状态
  • STOP:结束竞选

LeaderElectionSupport

public class LeaderElectionSupport implements LeaderElection{ private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class); //ZooKeeper客户端,进行ZooKeeper操作 private ZooKeeper zooKeeper; //根节点名称 private String dir; //节点前缀 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; //要加锁节点 private String fullPath; //选举状态 private State state; //监听器 private Set<LeaderElectionListener> listeners; //存当前节点的信息 private volatile LeaderNode leaderNode; //监察器 private Watcher watcher; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(this.node); init(); state = State.STOP; listeners = Collections.synchronizedSet(new HashSet<>()); } /** * 初始化根节点、检查器等 * */ private void init() { try { watcher = new LeaderWatcher(); Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e); } } }

start

/** * Start. * 开始竞选 */ @Override public void start() { synchronized (this) { state = State.START; dispatchEvent(EventType.START); offerElection(); determineElection(); } }

offerElection

/** * 创建临时节点,参加竞选,并将主机信息保存在node中 * */ private void offerElection() { dispatchEvent(EventType.OFFER_START); state = State.OFFER; if (leaderNode == null) { synchronized (this) { try { if (leaderNode == null) { InetAddress ia = InetAddress.getLocalHost(); LeaderNode tmpNode = new LeaderNode(); tmpNode.setHostName(ia.getHostName()); String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL); tmpNode.setNodePath(path); tmpNode.setId(NodeUtil.getNodeId(path)); leaderNode = tmpNode; } } catch (Exception e) { becomeFailed(e); } } } dispatchEvent(EventType.OFFER_COMPLETE); }

determineElection

/** * 决定竞选结果 * 1、竞选节点序号最低的赢取选举 * 2、未赢得选举的节点,监听上一个节点,直到上一个节点被删除,则尝试重新竞选 * */ private void determineElection() { dispatchEvent(EventType.DETERMINE_START); state = State.DETERMINE; synchronized (this) { TreeSet<String> nodePathSet = getNodePathSet(); if (nodePathSet.isEmpty()) { becomeFailed(new Exception("no node")); return; } String leaderPath = nodePathSet.first(); if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) { becomeLeader(); } else { becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last()); } } dispatchEvent(EventType.DETERMINE_COMPLETE); }

becomeLeader

/** * 竞选成功 * */ private void becomeLeader() { dispatchEvent(EventType.ELECTED_START); state = State.ELECTED; dispatchEvent(EventType.ELECTED_COMPLETE); }

becomeReady

/** * 竞选失败进入就绪态 * */ private void becomeReady(String path) { try { Stat stat = zooKeeper.exists(path, watcher); if (stat == null) { determineElection(); } else { dispatchEvent(EventType.READY_START); state = State.READY; dispatchEvent(EventType.READY_COMPLETE); } } catch (KeeperException e) { becomeFailed(e); } catch (InterruptedException e) { becomeFailed(e); } }

becomeFailed

/** * 当发生异常时,更新为FAILED状态 * */ private void becomeFailed(Exception e) { state = State.FAILED; dispatchEvent(EventType.FAILED); logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e); }

getNodePathSet

/** * 获取参加竞选的节点信息 * */ private TreeSet<String> getNodePathSet() { TreeSet<String> nodeSet = new TreeSet<>(); try { List<String> nodes = zooKeeper.getChildren(dir, false); for (String node : nodes) { nodeSet.add(dir.concat("/").concat(node)); } } catch (KeeperException e) { becomeFailed(e); } catch (InterruptedException e) { becomeFailed(e); } return nodeSet; }

stop

/** * Stop. * 停止竞选 */ @Override public void stop() { synchronized (this) { dispatchEvent(EventType.STOP_START); deleteNode(); state = State.STOP; dispatchEvent(EventType.STOP_COMPLETE); } }

deleteNode

/** * 停止时,删除节点,退出竞选 * */ private void deleteNode() { try { if (leaderNode != null) { synchronized (this) { zooKeeper.delete(leaderNode.getNodePath(), -1); leaderNode = null; } } } catch (InterruptedException e) { becomeFailed(e); } catch (KeeperException e) { becomeFailed(e); } }

getLeaderHostName

/** * Gets get leader host name. * * @return the get leader host name */ @Override public String getLeaderHostName() { synchronized (this) { TreeSet<String> nodePathSet = getNodePathSet(); if (!nodePathSet.isEmpty()) { try { String leaderPath = nodePathSet.first(); return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null)); } catch (KeeperException e) { logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e); } catch (InterruptedException e) { logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e); } catch (IOException e) { logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e); } catch (ClassNotFoundException e) { logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e); } } return null; } }

getLeaderNodePath

/** * Gets get leader node path. * * @return the get leader node path */ @Override public String getLeaderNodePath() { synchronized (this) { TreeSet<String> nodePathSet = getNodePathSet(); return nodePathSet.isEmpty() ? null : nodePathSet.first(); } }

LeaderWatcher

/** * 内部watcher类,当竞选失败时,watch前一个节点,当前一个节点别移除时,再次发起决策 * */ private class LeaderWatcher implements Watcher { /** * Process. * * @param watchedEvent the watched event */ @Override public void process(WatchedEvent watchedEvent) { try { if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) { determineElection(); } } catch (Exception e) { logger.error("[LeaderWatcher#process] error : " + e.toString(), e); } } }

总结

以上就是我们利用ZooKeeper的临时节点和Watcher特性实现的公平模式分布式竞选。

可以进行简单的选主操作,适用于如执行单机定时任务、心跳检测等场景。实际上是实现的Master-Slave模型。

源代码可见:aloofJr

而对高可用要求较多的复杂选举场景,如分布式存储、同步等,则需要考虑集群一致性、脑裂等各种情况,则需要实现如Paxos、raft、Zab等一致性算法协议。如ZooKeeper集群的选举模式就是使用的Zab算法。
我们后续会进行深入的探讨。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

原文链接:https://yq.aliyun.com/articles/743242
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章