蚂蚁Raft一致性算法库SOFAJRaft深入分析
大家好,我是 V 哥,SOFAJRaft 是蚂蚁金服开源的一个基于 Raft 共识算法的 Java 实现,它特别适合高负载、低延迟的分布式系统场景。SOFAJRaft 支持 Multi-Raft-Group,能够同时处理多个 Raft 集群,具有扩展性和强一致性保障。这个项目是从百度的 braft 移植而来的,并且在性能和功能上做了多项优化。今天的文章,V 哥来聊一聊SOFAJRaft的核心源码实现。
打开全球最大的基友网站 Github,搜索 sofa-jraft,可以找到SOFAJRaft库的源码实现:
SOFAJRaft 是一个基于 RAFT 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。 使用 SOFAJRaft 你可以专注于自己的业务领域,由 SOFAJRaft 负责处理所有与 RAFT 相关的技术难题,并且 SOFAJRaft 非常易于使用,你可以通过几个示例在很短的时间内掌握它。
V哥要介绍的不是基础应用,而是通过SOFAJRaft库的实现原理,帮助兄弟们来理解Raft算法
。
SOFAJRaft 核心概念
SOFAJRaft 的核心是 Raft 算法,它主要的组件包括:
- Leader 选举:用于在集群中选出唯一的 Leader。
- 日志复制:Leader 将客户端的请求日志复制到所有的 Follower。
- 日志一致性:通过多数派机制确保集群中的日志是一致的。
- 日志应用:日志经过多数派确认后应用到状态机中。
核心源码分析
1. Raft 节点启动与初始化
SOFAJRaft 中的 Raft 节点通过 NodeImpl
类进行管理,它是 Raft 节点的核心实现。
public class NodeImpl implements Node, Lifecycle<NodeOptions>, Replicator.ReplicatorStateListener, StateMachineCaller.RaftStateMachineListener { // Raft 节点状态 private volatile State state; private final RaftGroupId groupId; // Raft group ID private final PeerId serverId; // 当前节点 ID private final NodeOptions options; // 节点选项配置 // 构造函数 public NodeImpl(final String groupId, final PeerId serverId) { this.groupId = new RaftGroupId(groupId); this.serverId = serverId; this.options = new NodeOptions(); } @Override public synchronized boolean init(final NodeOptions opts) { // 初始化配置 this.options = opts; // 启动选举定时器等逻辑 } }
在这里,NodeImpl
类的 init
方法用于初始化 Raft 节点,它会设置 Raft 节点的配置并启动选举定时器等机制。
2. Leader 选举
Raft 的 Leader 选举是通过定时器和心跳机制来实现的。当 Follower 没有在一段时间内收到 Leader 的心跳时,它会进入选举状态。
public class ElectionTimer extends Timer { private final NodeImpl node; public ElectionTimer(NodeImpl node) { this.node = node; } @Override public void run() { // 处理选举超时 this.node.handleElectionTimeout(); } }
当定时器超时时,会触发 handleElectionTimeout
方法进行选举。
private void handleElectionTimeout() { if (this.state != State.FOLLOWER) { return; } // 进入候选者状态 becomeCandidate(); // 发送投票请求 sendVoteRequests(); }
这里的逻辑非常清晰了,当节点是 Follower 并且发生选举超时时,它会转换为候选者并开始发送投票请求给其他节点。
3. 日志复制
在 Raft 中,Leader 负责将客户端的请求日志复制到 Follower。
public class LeaderState { private final NodeImpl node; private final LogManager logManager; public LeaderState(NodeImpl node) { this.node = node; this.logManager = node.getLogManager(); } public void replicateLog(final LogEntry logEntry) { // 将日志复制到 Follower 节点 for (PeerId peer : node.getReplicatorList()) { Replicator replicator = node.getReplicator(peer); replicator.sendAppendEntries(logEntry); } } }
在这里,Leader 通过 Replicator
将日志复制到所有 Follower 节点,sendAppendEntries
方法会发送 AppendEntries
请求。
4. 日志一致性
Raft 算法通过多数派机制来确保日志的一致性,来看一下源码:
public class AppendEntriesResponseHandler { private final NodeImpl node; public void handleResponse(AppendEntriesResponse response) { if (response.success) { // 更新提交的日志索引 node.getLogManager().commitIndex(response.index); } else { // 如果失败,可能需要重新发送日志或处理冲突 node.handleLogReplicationFailure(response); } } }
当节点收到 AppendEntriesResponse
时,如果复制成功,它会更新日志的提交索引,确保日志的一致性。
5. 状态机应用
一旦日志被提交,Raft 将这些日志应用到状态机中,以实现最终的系统状态更新。
public class StateMachineCaller { private final StateMachine stateMachine; public void onApply(final List<LogEntry> entries) { // 将提交的日志应用到状态机 for (LogEntry entry : entries) { stateMachine.apply(entry); } } }
状态机将处理客户端请求并更新系统状态,这里 apply
方法会被调用来执行具体的业务逻辑。
我们继续深入探讨 SOFAJRaft 的其他核心部分,包括**日志管理(Log Management)**、**快照(Snapshot)机制**和**故障处理**,这些部分在分布式系统中都非常重要,尤其在长时间运行和高负载场景下。
6. 日志管理(Log Management)
日志管理是 Raft 协议中重要的一部分,它保证了每个节点在不同时间点所保存的日志能够保持一致。SOFAJRaft 使用 LogManager
来管理日志的存储和持久化。实现的代码是这样滴:
public class LogManager { private final List<LogEntry> logEntries; // 日志条目列表 private long commitIndex; // 当前提交的日志索引 private long lastApplied; // 最后应用的日志索引 public LogManager() { this.logEntries = new ArrayList<>(); } public synchronized void appendEntry(LogEntry entry) { // 将新日志添加到日志列表 logEntries.add(entry); } public synchronized void commitIndex(long newCommitIndex) { // 更新提交索引,保证提交的日志能在状态机中被应用 this.commitIndex = newCommitIndex; } public synchronized List<LogEntry> getUnappliedEntries() { // 获取尚未应用到状态机的日志 return logEntries.subList((int) lastApplied + 1, (int) commitIndex + 1); } public void applyLogsToStateMachine(StateMachine stateMachine) { List<LogEntry> unappliedEntries = getUnappliedEntries(); for (LogEntry entry : unappliedEntries) { stateMachine.apply(entry); // 应用日志到状态机 lastApplied++; } } }
在日志管理中,LogManager
负责维护 Raft 节点的所有日志条目,并根据多数派的确认来更新提交的日志索引。当提交的日志多于 commitIndex
时,这些日志可以应用到状态机中。applyLogsToStateMachine
方法则负责将日志条目应用到状态机。
7. 快照机制(Snapshot)
在长时间运行的集群中,如果仅仅依赖日志复制,日志可能会积累得非常庞大,影响性能和磁盘空间的使用。那要肿么办呢?因此,Raft 设计了快照(Snapshot)机制来定期将当前状态持久化,并丢弃已经持久化的日志。
public class SnapshotManager { private final StateMachine stateMachine; private final LogManager logManager; private long lastSnapshotIndex; public SnapshotManager(StateMachine stateMachine, LogManager logManager) { this.stateMachine = stateMachine; this.logManager = logManager; } public void takeSnapshot() { // 生成新的快照 Snapshot snapshot = stateMachine.saveSnapshot(); this.lastSnapshotIndex = logManager.getLastAppliedIndex(); // 持久化快照到磁盘 persistSnapshot(snapshot); // 清理旧的日志条目 logManager.truncatePrefix(lastSnapshotIndex); } private void persistSnapshot(Snapshot snapshot) { // 将快照写入磁盘的实现逻辑 // 如将 snapshot 对象序列化并写入文件系统 } }
在 SnapshotManager
中,takeSnapshot
方法会触发状态机生成当前的快照,并持久化到磁盘。当快照创建完成后,旧的日志条目可以被截断以释放存储空间。这极大地减少了日志的冗余,提高了系统的性能。
8. 故障处理与恢复
SOFAJRaft 具有健全的故障处理机制,能够处理节点的崩溃和网络分区等情况。Raft 协议通过日志复制和 Leader 选举机制来保证系统的容错性。
Follower 的故障恢复
当 Follower 恢复之后,会向 Leader 请求缺失的日志,Leader 会通过 InstallSnapshot
或者 AppendEntries
来将最新的日志发送给 Follower。
public class FollowerRecovery { private final NodeImpl node; private final LogManager logManager; public FollowerRecovery(NodeImpl node) { this.node = node; this.logManager = node.getLogManager(); } public void handleInstallSnapshot(InstallSnapshotRequest request) { // 收到 Leader 的快照安装请求 Snapshot snapshot = request.getSnapshot(); node.getStateMachine().loadSnapshot(snapshot); logManager.reset(snapshot.getLastIndex()); } public void handleAppendEntries(AppendEntriesRequest request) { // 收到 Leader 的日志复制请求 List<LogEntry> entries = request.getEntries(); logManager.appendEntries(entries); } }
handleInstallSnapshot
用于处理 Leader 发送的快照请求,当日志缺失过多时,Leader 会将整个快照发给 Follower,避免重复发送大量的日志。handleAppendEntries
则用于正常情况下的日志复制和恢复。
Leader 的故障恢复
Leader 故障后,集群会通过新的 Leader 选举恢复正常工作。Leader 选举过程在前面的部分已经详细介绍,当一个新的 Leader 被选出后,它会尝试将自己的日志与 Follower 同步。
public class LeaderRecovery { private final NodeImpl node; private final LogManager logManager; public LeaderRecovery(NodeImpl node) { this.node = node; this.logManager = node.getLogManager(); } public void catchUpFollowers() { // 向所有 Follower 发送最新的日志条目 for (PeerId peer : node.getReplicatorList()) { Replicator replicator = node.getReplicator(peer); replicator.sendAppendEntries(logManager.getUncommittedEntries()); } } }
新的 Leader 会调用 catchUpFollowers
来确保所有的 Follower 都与它保持一致,利用 Raft 的日志复制机制恢复一致性。
9. Multi-Raft-Group 的支持
SOFAJRaft 的一大特色是对 Multi-Raft-Group 的支持,也就是说,它能够管理多个独立的 Raft 集群。这使得它在一些需要分片或者不同业务隔离的场景中能够很好地应用。
public class MultiRaftGroupManager { private final Map<String, NodeImpl> raftGroups = new ConcurrentHashMap<>(); public NodeImpl createRaftGroup(String groupId, PeerId serverId, NodeOptions options) { NodeImpl node = new NodeImpl(groupId, serverId); node.init(options); raftGroups.put(groupId, node); return node; } public NodeImpl getRaftGroup(String groupId) { return raftGroups.get(groupId); } }
MultiRaftGroupManager
负责管理多个 Raft 集群,通过 createRaftGroup
方法可以创建新的 Raft 集群,每个集群都有自己的 NodeImpl
实例。这种架构设计让系统可以同时运行多个 Raft 实例,从而大幅提升扩展性。
总结
SOFAJRaft 基于 Raft 算法实现了一个高性能、支持 Multi-Raft-Group 的分布式一致性系统。它通过 NodeImpl 负责 Raft 节点的管理,通过 Leader 选举、日志复制、多数派机制等实现分布式系统中的强一致性。
关键代码展示了从节点初始化到日志复制和一致性维护的核心流程,这些是 Raft 算法的重要组成部分。
SOFAJRaft 的设计通过日志管理、快照机制、故障处理以及 Multi-Raft-Group 的支持,提供了一个健壮且高效的分布式一致性解决方案。通过对关键代码的分析,我们可以看到它在处理日志复制、一致性维护和快照生成上的精妙实现,能够有效应对高负载、长时间运行的分布式系统场景。
好了,整理的学习笔记就到这里,分享给大家,希望可以帮助你更加深入的理解 Raft 算法,V 哥在这里求个关注和点赞,感谢感谢。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
这样的SQL太吓人了
昨天松哥在朋友圈发了这样一张图: 很多小伙伴看到了能够快速发现问题,当 company_id 为 null 的时候,会导致全表更新。 但是也有小伙伴不解,自己平时就是这么写的呀,也没什么问题,如果有问题,那么上面的 SQL 该怎么改呢? 松哥来和大家简单聊几句。 一 防止全表更新 如果在生产环境中使用 UPDATE 语句更新表数据,此时如果忘记携带本应该添加的 WHERE 条件,那么后果不堪设想。 那么怎么避免这个问题呢? 二 sql_safe_updates sql_safe_updates 是 MySQL 数据库中的一个参数,它的作用是增强数据安全性,防止因误操作导致的数据丢失或破坏。 具体来说,当 sql_safe_updates 设置为 ON(启用)时,MySQL 将阻止执行没有明确 WHERE 子句的 UPDATE 或 DELETE 语句。这意味着如果试图运行一个不包含 WHERE 条件来限定更新或删除范围的 DML 语句,MySQL 会抛出一个错误。而当 sql_safe_updates 设置为 OFF(禁用)时,MySQL 不会对此类无条件更新或删除操作进行特殊限制,允许...
- 下一篇
使用 Elastic 和 Mistral 构建多语言 RAG(二)
这篇文章是之前的文章 “使用 Elastic 和 Mistral 构建多语言 RAG(一)” 的续篇。在这篇文章中,我将展示如何在本地部署中完成在那篇文章中的实现。 注意:由于 semantic text 从 8.15 版本开始提供,你需要至少 8.15 及以上的版本才可以运行下面的代码。 安装 Elasticsearch 及 Kibana 如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装: 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch Kibana:如何在 Linux,MacOS 及 Windows上安装 Elastic 栈中的 Kibana 在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。在首次启动 Elasticsearch 的时候,我们可以看到如下的输出: 在上面,我们可以看到 elastic 超级用户的密码。我们记下它,并将在下面的代码中进行使用。 我们还可以在安装 Elasticsearch 目录中找到 Elasticsearch 的访问证书: $ p...
相关文章
文章评论
共有0条评论来说两句吧...