经典分布式算法 —— 浅显易懂的 Raft 算法实现
一、Raft概念
copy一下其他小伙伴写的文章: Raft算法详解
不同于Paxos算法直接从分布式一致性问题出发推导出来,Raft算法则是从多副本状态机的角度提出,用于管理多副本状态机的日志复制。Raft实现了和Paxos相同的功能,它将一致性分解为多个子问题:Leader选举(Leader election)、日志同步(Log replication)、安全性(Safety)、日志压缩(Log compaction)、成员变更(Membership change)等。同时,Raft算法使用了更强的假设来减少了需要考虑的状态,使之变的易于理解和实现。
Raft将系统中的角色分为领导者(Leader)、跟从者(Follower)和候选人(Candidate):
- Leader:接受客户端请求,并向Follower同步请求日志,当日志同步到大多数节点上后告诉Follower提交日志。
- Follower:接受并持久化Leader同步的日志,在Leader告之日志可以提交之后,提交日志。
- Candidate:Leader选举过程中的临时角色。
本文不过多赘述 raft 算法是个什么东西... 这里再贴一个十分好理解的文章:The Raft Consensus Algorithm
二、系统初步设计
在对raft有一定理解后,我们简单梳理一下在raft选举过程中,我们需要的一些角色,以及角色的司职。
首先我们需要一个选举控制类,单例实现即可,节点的选举全权交给此选举控制类的实现,我们称其为 ElectOperator。
先讲一个 raft 中重要的概念:世代,也称为 epoch,但在这篇文章,将其称为 generation(不要纠结这个 = =)。 世代可以认为是一个标记当前发送的操作是否有效的标识,如果收到了小于本节点世代的请求,则可无视其内容,如果收到了大于本世代的请求,则需要更新本节点世代,并重置自己的身份,变为 Follower,类似于乐观锁的设计理念。
我们知道,raft中一共有三种角色:Follower、Candidate、Leader
(1)Follower
Follower 需要做什么呢:
- 接收心跳
- Follower 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate
- 接收拉票请求,并返回自己的投票
好的,Follower非常简单,只需要做三件事即可。
(2)Candidate
Candidate 扮演什么样的职能呢:
- 接收心跳
- Candidate 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate
- 接收拉票请求,并返回自己的投票
- 向集群中的其他节点发起拉票请求
- 当收到的投票大于半数( n/2 + 1, n为集群内的节点数量),转变为 Leader
Candidate 比起 Follower 稍微复杂一些,但前三件事情都是一样的。
(3)Leader
Leader 在选举过程中扮演的角色最为简单:
- 接收心跳
- 向集群内所有节点发送心跳
Leader 也是可以接收心跳的,当收到大于当前世代的心跳或请求后,Leader 需要转变为 Follower。Leader 不可能收到同世代的心跳请求,因为 (1) 在 raft 算法中,同一世代中,节点仅对同一个节点进行投票。(2) 需要收到过半投票才可以转变为 Leader。
三、系统初步实现
简单贴一下选举控制器需要的一些属性代码,下面的注释都说的很清楚了,其中需要补充的一点是定时任务使用了时间轮来实现,不理解没有关系...就是个定时任务,定时任务的一个引用放在 Map<TaskEnum, TimedTask> taskMap; 中,便于取消任务。
public class ElectOperator extends ReentrantLocker implements Runnable { // 成为 Candidate 的退避时间(真实退避时间需要 randomized to be between 150ms and 300ms ) private static final long ELECTION_TIMEOUT_MS = ElectConfigHelper.getElectionTimeoutMs(); // 心跳间隔 private static final long HEART_BEAT_MS = ElectConfigHelper.getHeartBeatMs(); /** * 该投票箱的世代信息,如果一直进行选举,一直能达到 {@link #ELECTION_TIMEOUT_MS},而选不出 Leader ,也需要15年,generation才会不够用,如果 * generation 的初始值设置为 Long.Min (现在是0,则可以撑30年,所以完全呆胶布) */ private long generation; /** * 当前节点的角色 */ private NodeRole nodeRole; /** * 所有正在跑的定时任务 */ private Map<TaskEnum, TimedTask> taskMap; /** * 投票箱 */ private Map<String/* serverName */, Boolean> box; /** * 投票给了谁的投票记录 */ private Votes voteRecord; /** * 缓存一份集群信息,因为集群信息是可能变化的,我们要保证在一次选举中,集群信息是不变的 */ private List<HanabiNode> clusters; /** * 心跳内容 */ private HeartBeat heartBeat; /** * 现在集群的leader是哪个节点 */ private String leaderServerName; private volatile static ElectOperator INSTANCE; public static ElectOperator getInstance() { if (INSTANCE == null) { synchronized (ElectOperator.class) { if (INSTANCE == null) { INSTANCE = new ElectOperator(); ElectControllerPool.execute(INSTANCE); } } } return INSTANCE; }
另外,上面罗列的这些值大都是需要在更新世代时重置的,我们先拟定一下更新世代的逻辑,通用的来讲,就是清除投票记录,清除自己的投票箱,更新自己的世代,身份变更为 Follower 等等,我们将这个方法称为 init。
/** * 初始化 * * 1、成为follower * 2、先取消所有的定时任务 * 3、重置本地变量 * 4、新增成为Candidate的定时任务 */ private boolean init(long generation, String reason) { return this.lockSupplier(() -> { if (generation > this.generation) {// 如果有选票的世代已经大于当前世代,那么重置投票箱 logger.debug("初始化投票箱,原因:{}", reason); // 1、成为follower this.becomeFollower(); // 2、先取消所有的定时任务 this.cancelAllTask(); // 3、重置本地变量 logger.debug("更新世代:旧世代 {} => 新世代 {}", this.generation, generation); this.generation = generation; this.voteRecord = null; this.box = new HashMap<>(); this.leaderServerName = null; // 4、新增成为Candidate的定时任务 this.becomeCandidateAndBeginElectTask(this.generation); return true; } else { return false; } }); }
(1) Follower的实现
基于上面的分析,我们可以归纳一下 Follower 需要一些什么样的方法:
1、转变为 Candidate 的定时任务
实际上就是 ELECTION_TIMEOUT_MS (randomized to be between 150ms and 300ms) 后,如果没收到 Leader 的心跳,或者自己变为 Candidate 后,在这个时间内没有成功上位,则继续转变为 Candidate。
为什么我们成为 Candidate 的退避时间需要随机 150ms - 300ms呢?这是为了避免所有节点的选举发起发生碰撞,如果说都是相同的退避时间,每个节点又会优先投自己一票,那么这个集群系统就会陷入无限发起投票,但又无法成为 Leader 的局面。
简而言之就是我们需要提供一个可刷新的定时任务,如果在一定时间内没刷新这个任务,则节点转变为 Candidate,并发起选举,代码如下。首先取消之前的 becomeCandidate 定时定时任务,然后创建新的 becomeCandidate 定时任务 => [设定在 electionTimeout 后调用 beginElect(generation) 方法]。
/** * 成为候选者的任务,(重复调用则会取消之前的任务,收到来自leader的心跳包,就可以重置一下这个任务) * * 没加锁,因为这个任务需要频繁被调用,只要收到leader来的消息就可以调用一下 */ private void becomeCandidateAndBeginElectTask(long generation) { this.lockSupplier(() -> { this.cancelCandidateAndBeginElectTask("正在重置发起下一轮选举的退避时间"); // The election timeout is randomized to be between 150ms and 300ms. long electionTimeout = ELECTION_TIMEOUT_MS + (int) (ELECTION_TIMEOUT_MS * RANDOM.nextFloat()); TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation)); Timer.getInstance() .addTask(timedTask); taskMap.put(TaskEnum.BECOME_CANDIDATE, timedTask); return null; }); }
2、接收心跳与心跳回复
接收心跳十分简单,如果当前心跳大于等于当前世代,且还未认定某个节点为 Leader,则取消所有定时任务,成为Follower,并记录心跳包中 Leader 节点的信息,最后重置一下成为候选者的任务。
如果已经成为某个 Leader 的 Follower,则直接成为候选者的任务即可。
另外一个要注意的是,needToSendHeartBeatInfection,是否需要发送心跳感染包,当收到低世代 Leader 的心跳时,如果当前集群已经选出 Leader ,则回复此心跳包,告诉旧 Leader,现在已经是新世代了!(代码中没有展现,其实就是再次封装一个心跳包,带上世代信息和 Leader 节点信息,回复给 Leader 即可)
public boolean receiveHeatBeat(String leaderServerName, long generation, String msg) { return this.lockSupplier(() -> { boolean needToSendHeartBeatInfection = true; // 世代大于当前世代 if (generation >= this.generation) { needToSendHeartBeatInfection = false; if (this.leaderServerName == null) { logger.info("集群中,节点 {} 已经成功在世代 {} 上位成为 Leader,本节点将成为 Follower,直到与 Leader 的网络通讯出现问题", leaderServerName, generation); // 取消所有任务 this.cancelAllTask(); // 成为follower this.becomeFollower(); // 将那个节点设为leader节点 this.leaderServerName = leaderServerName; } // 重置成为候选者任务 this.becomeCandidateAndBeginElectTask(this.generation); } return needToSendHeartBeatInfection; }); }
3、接收拉票请求与回复投票
我们知道,raft 在一个世代只能投票给一个节点,且发起投票者会首先投票给自己。所以逻辑就很简单了,只有当世代大于等于当前,且还未投票时,则拉票请求成功,返回true即可,否则都视为失败,返回false。
/** * 某个节点来请求本节点给他投票了,只有当世代大于当前世代,才有投票一说,其他情况都是失败的 * * 返回结果 * * 为true代表接受投票成功。 * 为false代表已经给其他节点投过票了, */ public VotesResponse receiveVotes(Votes votes) { return this.lockSupplier(() -> { logger.debug("收到节点 {} 的投票请求,其世代为 {}", votes.getServerName(), votes.getGeneration()); String cause = ""; if (votes.getGeneration() < this.generation) { cause = String.format("投票请求 %s 世代小于当前世代 %s", votes.getGeneration(), this.generation); } else if (this.voteRecord != null) { cause = String.format("在世代 %s,本节点已投票给 => %s 节点", this.generation, this.voteRecord.getServerName()); } else { this.voteRecord = votes; // 代表投票成功了 } boolean result = votes.equals(this.voteRecord); if (result) { logger.debug("投票记录更新成功:在世代 {},本节点投票给 => {} 节点", this.generation, this.voteRecord.getServerName()); } else { logger.debug("投票记录更新失败:原因:{}", cause); } String serverName = InetSocketAddressConfigHelper.getServerName(); return new VotesResponse(this.generation, serverName, result, serverName.equals(this.leaderServerName), votes.getGeneration()); }); }
(2) Candidate的实现
可以看出 Follower 十分简单, Candidate 在 Follower 的基础上增加了发起选举的拉票请求,与接收投票,并上位成为Leader两个功能,实际上也十分简单。
1、发起拉票请求
回顾一下前面的转变成 Candidate 的定时任务,定时任务实际上就是调用一个方法
TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));
这个 beginElect 就是转变为 Candidate 并发起选举的实现。让我们先想想需要做什么,首先肯定是
- 更新一下自己的世代,因为已经长时间没收到 Leader 的心跳包了,我们需要自立门户。
- 给自己投一票
- 要求其他节点给自己投票
分析到这里就很明了了。下面首先执行 updateGeneration 方法,实际上就是执行前面所说的 init 方法,传入 generation + 1 的世代,重置一下上个世代各种保存的状态;然后调用 becomeCandidate,实际上就是切换一下身份,将 Follower 或者 Candidate 切换为 Candidate;给自己的 voteRecord 投一票,最后带上自己的节点标识和世代信息,去拉票。
/** * 开始进行选举 * * 1、首先更新一下世代信息,重置投票箱和投票记录 * 2、成为候选者 * 3、给自己投一票 * 4、请求其他节点,要求其他节点给自己投票 */ private void beginElect(long generation) { this.lockSupplier(() -> { if (this.generation != generation) {// 存在这么一种情况,虽然取消了选举任务,但是选举任务还是被执行了,所以这里要多做一重处理,避免上个周期的任务被执行 return null; } logger.info("Election Timeout 到期,可能期间内未收到来自 Leader 的心跳包或上一轮选举没有在期间内选出 Leader,故本节点即将发起选举"); updateGeneration("本节点发起了选举");// this.generation ++ // 成为候选者 logger.info("本节点正式开始世代 {} 的选举", this.generation); if (this.becomeCandidate()) { VotesResponse votes = new VotesResponse(this.generation, InetSocketAddressConfigHelper.getServerName(), true, false, this.generation); // 给自己投票箱投票 this.receiveVotesResponse(votes); // 记录一下,自己给自己投了票 this.voteRecord = votes; // 让其他节点给自己投一票 this.askForVoteTask(new Votes(this.generation, InetSocketAddressConfigHelper.getServerName()), 0); } return null; }); }
2、接收投票,并成为 Leader
如果说在 150ms and 300ms 之间,本节点收到了过半投票,则可上位成 Leader,否则定时任务会再次调用 beginElect,再次更新本节点世代,然后发起新一轮选举。
接收投票其实十分简单,回忆一下前面接收拉票请求与回复投票,实际上就是拉票成功,就返回true,否则返回flase。
我们每次都判断一下是否拿到过半的票数,如果拿到,则成为 Leader,另外有一个值得注意的是,为了加快集群恢复可用的进程,类似于心跳感染(如果心跳发到Leader那里去了,Leader会告诉本节点,它才是真正的Leader),投票也存在投票感染,下面的代码由 votesResponse.isFromLeaderNode() 来表示。
投票的记录也是十分简单,就是把每个投票记录扔到 Map<String/* serverName */, Boolean> box; 里,true 表示同意投给本节点,flase 则不同意,如果同意达到半数以上,则调用 becomeLeader 成为本世代 Leader。
/** * 给当前节点的投票箱投票 */ public void receiveVotesResponse(VotesResponse votesResponse) { this.lockSupplier(() -> { if (votesResponse.isFromLeaderNode()) { logger.info("来自节点 {} 的投票应答表明其身份为 Leader,本轮拉票结束。", votesResponse.getServerName()); this.receiveHeatBeat(votesResponse.getServerName(), votesResponse.getGeneration(), String.format("收到来自 Leader 节点的投票应答,自动将其视为来自 Leader %s 世代 %s 节点的心跳包", heartBeat.getServerName(), votesResponse.getGeneration())); } if (this.generation > votesResponse.getAskVoteGeneration()) {// 如果选票的世代小于当前世代,投票无效 logger.info("来自节点 {} 的投票应答世代是以前世代 {} 的选票,选票无效", votesResponse.getServerName(), votesResponse.getAskVoteGeneration()); return null; } if (votesResponse.isAgreed()) { if (!voteSelf) { logger.info("来自节点 {} 的投票应答有效,投票箱 + 1", votesResponse.getServerName()); } // 记录一下投票结果 box.put(votesResponse.getServerName(), votesResponse.isAgreed()); List<HanabiNode> hanabiNodeList = this.clusters; int clusterSize = hanabiNodeList.size(); int votesNeed = clusterSize / 2 + 1; long voteCount = box.values() .stream() .filter(aBoolean -> aBoolean) .count(); logger.info("集群中共 {} 个节点,本节点当前投票箱进度 {}/{}", hanabiNodeList.size(), voteCount, votesNeed); // 如果获得的选票已经大于了集群数量的一半以上,则成为leader if (voteCount == votesNeed) { logger.info("选票过半,准备上位成为 leader 节点", votesResponse.getServerName()); this.becomeLeader(); } } else { logger.info("节点 {} 在世代 {} 的投票应答为:拒绝给本节点在世代 {} 的选举投票(当前世代 {})", votesResponse.getServerName(), votesResponse.getGeneration(), votesResponse.getAskVoteGeneration(), this.generation); // 记录一下投票结果 box.put(votesResponse.getServerName(), votesResponse.isAgreed()); } return null; }); }
(3) Leader 的实现
作为 Leader,在 raft 中的实现却是最简单的,我们只需要给子节点发心跳包即可。然后如果收到大于自己世代的心跳感染,则成为新世代的 Follower,接收心跳的逻辑和 Follower 没有区别。
/** * 当选票大于一半以上时调用这个方法,如何去成为一个leader */ private void becomeLeader() { this.lockSupplier(() -> { long becomeLeaderCostTime = TimeUtil.getTime() - this.beginElectTime; this.beginElectTime = 0L; logger.info("本节点 {} 在世代 {} 角色由 {} 变更为 {} 选举耗时 {} ms,并开始向其他节点发送心跳包 ......", InetSocketAddressConfigHelper.getServerName(), this.generation, this.nodeRole.name(), NodeRole.Leader.name(), becomeLeaderCostTime); this.nodeRole = NodeRole.Leader; this.cancelAllTask(); this.heartBeatTask(); this.leaderServerName = InetSocketAddressConfigHelper.getServerName(); return null; }); }
四、运行我们的 raft!
看到这里,不用怀疑.. 一个 raft 算法已经实现了。至于一些细枝末节的东西,我相信大家都能处理好的.. 比如如何给其他节点发送各种包,包怎么去定义之类的,都和 raft 本身没什么关系。
一般来说,在集群可用后,我们就可以让 Follower 连接 Leader 的业务端口,开始真正的业务了。 raft作为一个能快速选主的分布式算法,一次选主基本只需要一次 RTT(Round-Trip Time)时间即可,非常迅速。
运行一下我们的项目,简单测试,我们只用三台机子,想测试多台机子可以自己去玩玩...我们可以看到就像 zookeeper,我们需要配置两个端口,前一个作为选举端口,后一个则作为业务端口。
本文章只讲了怎么选举,后面的端口可以无视,但是必填...
依次启动 hanabi.1,hanabi.2,hanabi.3
很快,我们就能看到 hanabi.1 成为了世代28的 Leader,第一次选举耗时久是因为启动的时候有各种初始化 = =
此时,我们关闭 hanabi.1,因为集群还有2台机器,它们之间完全可以选出新的 Leader,我们关闭 hanabi.1 试试。观察 hanabi.3,我们发现,很快,hanabi.3 就发现 Leader 已经挂掉,并发起了世代 29 的选举。
在世代29中,仅存的 hanabi.2 拒绝为本节点投票,所以在 ELECTION_TIMEOUT_MS 到期后,hanabi.3 再次发起了选举,此次选举成功,因为 hanabi.2 还未到达 ELECTION_TIMEOUT_MS,所以还在世代 28,收到了世代 29 的拉票请求后,hanabi.2 节点将自己的票投给了 hanabi.3,hanabi.3 成功上位。
本项目github地址 : 基于raft算法实现的分布式kv存储框架 (项目实际上还有日志写入,日志提交,日志同步等功能,直接无视它...还没写完 = =)

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spring 中 Bean 的装配(注入)Autowired Resource Inject 三种模式对比
基础知识 JSR 250: Common Annotations for the JavaTM Platform JSR 330: Dependency Injection for Java JSR 305: Annotations for Software Defect Detection beans-annotation-config 本文讲解的是 Spring 基于注解的 Bean 装载,XML 形式的配置与 Annotation 形式的配置实现功能都是一样的,且可以混用,但是要注意的是 Annotation 先于 XML 执行,注解的配置可能会被 XML 覆盖。 Annotation injection is performed before XML injection. Thus, the XML configuration overrides the annotations for properties wired through both approaches. 装配(autowiring ) 约等于 注入(injection ) 自动装配的模式 模式 描述 no (默认...
- 下一篇
tensorflow 之 卷积神经网络
应用场景 图像识别与检索 人脸识别 性别/年龄/情绪识别 物体检测 视频处理 语音分析 概述 一般一个卷积神经网络由多个卷积层构成,在卷基层内部通常会有如下几个操作: 图像通过多个卷积核滤波,添加偏置,提取局部特征每个卷积核会映射出一个新的2D图像。 卷积核的滤波结果输出到激活函数中,激活函数通常选ReLU 对激活函数的结果进行池化操作,池化就是对图像分割成不同的小区域后取平均值或最大值。一般取最大值。 上述几个步骤就构成了最常见的卷积层。在池化的后面还可以加上batch normalization等操作。 一个卷积层中可以有不同的卷积核,而每一个卷积核都对应一个滤波后映射出的新图像,同一个新图像的每一个像素都来自完全相同的卷积核。这种卷积核的权值共享可以有效降低模型负责度,减轻过拟合,减少计算量。 卷积神经网络结构 建立卷积神经网络对手写数字识别问题进行优化,构建由两个卷积层(包含池化层),两个全连接层构成的卷积神经网络。输入图像是28×28的单通道数据,输出是10×1的one_hot编码的向量。 第一层:卷积核大小是[5,5],输入通道1,输出通道32,padding选择SAME模...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,CentOS8安装Elasticsearch6.8.6
- 2048小游戏-低调大师作品
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程