深入理解 relocating 对Elasticsearch集群的影响
rebalance 用于将集群中的分片进行均衡,保持各个节点的分片数量大致相等,当集群扩容或缩容,掉一个节点的时候,这过程会自动完成。直观的感觉他应该是在后台默默干活的过程,最多占用带宽和磁盘 io 资源,应该感受不到他的存在,但实际情况是,他可能引起一些意想不到的问题。
这篇文章主要思考分片 relocating 对集群会有哪些影响(基于 v7.7),有下面几个。
shard-started RPC 会抢占较多的 master 处理时间
分片移动结束后,target 节点会向 master 发起一个 shard-started PRC,该 RPC 具有次高优先级:URGENT, master 对该 RPC 的处理比较慢,尤其是在集群分片数量到几万的级别,从而可能导致一些较低优先级的 RPC 长时间来不及执行,例如 put-mapping,进而导致对业务的明显影响。
由于 rebalance,allocation filtering,forced awareness等任意原因产生的 shard-started RPC 都会存在这个问题,例如扩容集群的时候,如果把 rebalance 并发开的比较大,对 master 的处理能力造成明显影响。因此对于分片数较多的集群,当你想要加大 rebalance 和 recovery 并发的时候要考虑到对 master 的影响。
move 一个主分片,对写入流程的影响
当一个主分片被 rebalance 或者手工 move 的时候,可以想象必然存在一个时间段该主分片无法写入。
Elasticsearch 对主分片的 relocating 也是直接 move,不会先将主分片资格让给其他副分片,再进行 move,即便如此,也会存在一个时间点进行切换,无法响应写入。
这个时间段从 RPC 的时序来看的话,如下图所示,红色标记的区域为阻塞写入流程的时间段。在该时间段内,客户端的写入请求会被阻塞无返回,直到这部分处理完成。

以手工 move 为例,当一个主分片从 node-idea move 到 node1时,主要有以下几个阶段完成:
首先会通过发布新的集群状态,将该分片标记为 RELOCATING状态,更新 routing_table 和 routing_nodes
数据节点收到集群状态后,开始执行 recovery 流程,该流程自 target 节点发送 start_recovery开始,至 Source 阶段返回 start_recovery response 结束。
recovery 完成后,target发送 shards-started RPC 给 master 节点,master 节点再次下发集群状态,将该分片标记为 STARTED 状态。
阻塞客户端 bulk 写入的阶段,就在 source 节点执行 finalize 阶段时,准备发送 handoff RPC 时开始,直到收到 master 新的集群状态,将该分片标记为 STARTED,每个节点应用集群状态的时间点略有差异,所以每个节点停止阻塞写入的时间点也会有微小差异。
handoff 的 RPC的作用是告知 target 节点该分片可以切换主分片状态,对 handoff RPC 的处理都是一些计算操作,其中涉及到几个锁,一般会很快完成。
整个主分片的 relocating 过程对写入的影响是很复杂的处理过程,我将他们划分为几个阶段,下面是详细过程。
阻塞过程详细原理
在分片的整个 relocating 过程中,对 target 节点的请求都会被转发到 source 节点,直到 target 节点应用了 master 下发的分片变为 STARTED 的集群状态。
正常写入阶段
起初,写入请求到达 source 节点,像往常一样正常写入到主分片中,直到复制阶段。
复制阶段
此阶段将收到的写请求复制给 target 节点。他从收到 prepare_translog 的 response之后开始,直到 handoff 阶段开始之前。
1.将 target 分片加入到 replication group
收到 prepare_translog response之后,recovery 的 phase1阶段已结束,segments 文件发送完毕,target 节点的 engine 已启动。通过 shard.initiateTracking将 target 分片加入到复制组,后续的写入操作都会复制给 target 节点。
prepareEngineStep.whenComplete(prepareEngineTime -> {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
});
复制组信息在 ReplicationTracker 的 replicationGroup成员中维护。
2.在写入过程中,将写请求复制给 target
在写入流程的 performOnPrimary#doRun() 函数的 finishRequest方法中,最终调用 ReplicationOperation#handlePrimaryResult函数的 performOnReplicas方法将 index 发送给 target 节点。
handoff阻塞阶段
handoff阶段是为了完成主分片状态的转移,该阶段对收到的写入请求放到队列中,待主分片状态转移完成后继续执行,最多阻塞30分钟。期间对 target 节点的写入也会被转发的 source 节点执行。此阶段从 source 节点收到 finalize 的 response之后开始,直到下一阶段。
1.recovery 过程设置阻塞标记
在 source 节点收到 finalize 的 response之后,通过 handoff RPC 交接主分片状态,这个交接过程通过 indexShardOperationPermits.blockOperations 对此时收到的写入请求进行阻塞,最多30分钟。
public void relocated(final String targetAllocationId, final Consumer<ReplicationTracker.PrimaryContext> consumer){
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {//block 的范围就是这段括号里的代码
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId);
try {//下面调用 RemoteRecoveryTargetHandler.handoffPrimaryContext
consumer.accept(primaryContext);//执行这里之后发送和响应了 handoff_primary_context RPC
}
});
}
}
indexShardOperationPermits.blockOperations 中的阻塞操作只是 queuedBlockOperations 加1,后面写入流程会检查 queuedBlockOperations 的值是否为0
2.写入流程的处理
产生在写入链路的 acquire函数:
acquire:255, IndexShardOperationPermits (org.elasticsearch.index.shard)
acquirePrimaryOperationPermit:2764, IndexShard (org.elasticsearch.index.shard)
handlePrimaryRequest:256, TransportReplicationAction
该函数判断 queuedBlockOperations 大于0,于是将他添加到 delayedOperations,写入流程到此结束。delayedOperations中的操作会在本阶段的阻塞过程结束后处理。
失败重试阶段
失败重试阶段自source 节点处理完 handoff response之后开始,直到 master 下发了将分片标记为 STARTED 的集群状态,数据节点应用了集群状态之后结束,在应用这个集群状态之前,分片处于 relocating 状态,期间的写入操作由 source 节点执行,并且会写入失败,抛出 ShardNotInPrimaryModeException 异常,并捕获该异常,等待1分钟后重试写入,再次失败则退出;如果1分钟内收到了新的集群状态,也会重试写入,然后写入成功。
写入流程中,执行 acquire 函数时,发现 queuedBlockOperations为0,执行onAcquired.onResponse(releasable),调用到 wrapPrimaryOperationPermitListener函数时,发现分片已经不是主分片状态,抛出 ShardNotInPrimaryModeException 异常。
if (replicationTracker.isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
对于上者返回的异常,写入流程 acquirePrimaryOperationPermit 设置的 Listener会区分异常类型,如果是 ShardNotInPrimaryModeException 异常,则等待1分钟后进行重试。
move 一个副分片,对写入流程的影响
副分片的 move 过程与主分片的 move 过程相似,也是下发两次集群状态,通过 recovery 复制到 target 节点。
需要注意的是,recovery 复制分片到 target 节点,并非从副分片当前所在节点复制到 target,而是从主分片复制到 target。例如,主分片在节点 node-idea 上,副分片在 node-1,当我们从 node-1 move 到 node-2时,recovery 是从 node-idea 复制到 node-2。当 node-1应用主节点发布的分片状态变为 STARTED 的集群状态时,发现分片已经属于自己,于是删除本节点的分片。
与主分片的 relocating 类似的是,在复制阶段,会将正在 recovery 的分片添加到 replication group,这样 replication group 中就有了三个分片,主分片,原副分片,以及 move 到 target 节点的新的副分片。在复制阶段,写入操作会写入到这三个分片。
以手工 move 一个副分片为例,从 node-1 move 到 node-2时,当收到第一个第一个集群状态,分片被标记为 relocating,于是数据从 node-idea recovery 到 node-2。在此期间,新的 index 操作会由主分片节点复制到另外两个节点。如下图所示。
这个过程相当于先增加了一个副分片,执行副分片的 recovery 流程复制到新的 target。
当 recovery 执行完毕,master 下发第二个集群状态,分片被标记为 STARTED,node-1原来持有的分片被删除,如下图所示。
副分片的 relocating 过程不会有 handoff 阶段,整个 relocating 过程如同副分片的 recovery 过程一样,没有对写入进行阻塞的阶段。
(扫码或长按关注本公众号)
本文分享自微信公众号 - Elasticsearch 原理与实践(gh_81a98ec906ca)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Kafka 架构及原理分析
Kafka 架构及原理分析 设计架构 使用场景 架构 数据多写支持 基于 binlog 实现主从复制 Kafka 的进阶功能 消息幂等性 事务 特性 原理分析 生产者原理 服务端响应 分区存储 Leader 选举 消费者原理 offset 的存储 消费者分配 分区重分配 性能分析 技术总结 REFERENCES 简介 Kafka适合什么样的场景? 它可以用于两大类别的应用: 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue) 构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流处理,通过kafka stream topic和topic之间内部进行变化) 为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。 定位 消息中间件 消息引擎 分布式实时流处理平台 手机用户请横屏获取最佳阅读体验,REFERENCES中是本文参考的链接,如需要链接和更多资源,可以关注公众号后回复『知识星球』加入并获取长期知识分享服务。 设计架构 使用场景 大数据领域 网站行为分析 日志聚合 应用监控 流式数据处理 在线和...
- 下一篇
面试|不可不知的十大Hive调优技巧最佳实践
Apache Hive是建立在Apache Hadoop之上的数据仓库软件项目,用于提供数据查询和分析。Hive是Hadoop在HDFS上的SQL接口,它提供了类似于SQL的接口来查询存储在与Hadoop集成的各种数据库和文件系统中的数据。可以说从事数据开发工作,无论是在平时的工作中,还是在面试中,Hive具有举足轻重的地位,尤其是Hive的性能调优方面,不仅能够在工作中提升效率而且还可以在面试中脱颖而出。在本文中,我将分享十个性能优化技术,全文如下。 1.多次INSERT单次扫描表 默认情况下,Hive会执行多次表扫描。因此,如果要在某张hive表中执行多个操作,建议使用一次扫描并使用该扫描来执行多个操作。 比如将一张表的数据多次查询出来装载到另外一张表中。如下面的示例,表my_table是一个分区表,分区字段为dt,如果需要在表中查询2个特定的分区日期数据,并将记录装载到2个不同的表中。 INSERTINTOtemp_table_20201115SELECT*FROMmy_tableWHEREdt='2020-11-15';INSERTINTOtemp_table_20201116...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS关闭SELinux安全模块
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Hadoop3单机部署,实现最简伪集群