您现在的位置是:首页 > 文章详情

hdfs Balancer剖析

日期:2019-08-11点击:313

balance过程就是从存储使用率超出集群平均使用率的datanode上将超出的block移动到低于集群平均使用率的datanode上,最终满足平衡标准。
    over-utilized------>under-utilized
    over-utilized------>below-average
    above-average--->under-utilized

1. over-utilized :使用率超出阀值的datanode
2. above-average : 使用率超出平均值,但是低于阀值的datanode
3. below-average : 使用率低于平均值,但是高于阀值的datanode
4. under-utilized: 使用率低于阀值的datanode

平衡标准:参与balance的每个datanode的使用率 与 所有参与balance的全局的使用率 的差值 接近 给定的阀值threshold

 

balance过程分多个迭代完成,每个迭代开始时balancer按照上面的规则将所有参与balance的datanode分类、配对,形成<source,target>集合,一个<source,target>表示将source datanode上的部分block迁移到target datanode上。

每个<source,target>对应一个线程,该线程主要功能:
    1、从namenode上随机拉取一定数量的block(每次最多2G,累计20G),筛选之后保存到src_block列表中。触发拉取的条件之一:src_block列表中尚未被迁移的block数量少于5(固定值,不可配)。
    2、将src_block列表中的block提交到线程池(线程池大小:dfs.balancer.moverThreads,默认1000)进行迁移。

迁移过程使用多线程(线程数量:dfs.balancer.moverThreads)完成,每个线程每次迁移一个block。
迁移过程:
    step1:从src_block列表中选择一个block
    step2:检查block的所有副本所在的datanode中是否有跟target同机架的datanode,如果有则选择其作为proxy,如果没有则从中随机选择一个datanode作为proxy。
    step3:balancer通知target 从proxy上将block复制一份到本机。
    step4:如果复制成功,则target 请求namenode将block从source上删除,并反馈balancer。 
step1选择block时,会检查target,step2选择proxy时,会检查备选datanode,检查的内容包括:
    1、当前正在执行复制的线程数量是否超过最大线程数(dfs.datanode.balance.max.concurrent.moves)
    2、是否在禁用期。每次复制失败都会导致datanode 10秒钟内禁止做为proxy和target。

核心方法、流程:

Balancer.runOneIteration() //开始一轮 Dispatcher.init(); //选择参与balance的datanode Balancer.init(); //分组 Balancer.chooseStorageGroups() //构建<source, target> 优先匹配同机架、然后随意搭配 Dispatcher.dispatchAndCheckContinue() //以source为单位分发block移动任务 Dispatcher.dispatchBlockMoves(); //每个source启动一个线程 Source.dispatchBlocks(); Source.getBlockList(); //从namenode拉取总大小为2G的block Source.chooseNextMove() //每次选一个block放到线程池去移动 PendingMove.chooseBlockAndProxy(); //选择一个block和一个proxy PendingMove.markMovedIfGoodBlock(); PendingMove.isGoodBlockCandidate(); //是否是合适的block PendingMove.chooseProxySource() //为选择的block选择一个合适的proxy

datanode分类:

private long init(List<DatanodeStorageReport> reports) { // compute average utilization for (DatanodeStorageReport r : reports) { policy.accumulateSpaces(r); } policy.initAvgUtilization(); // create network topology and classify utilization collections: // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); for(StorageType t : StorageType.getMovableTypes()) { //所有参与balance的节点[lived 且包含在include中的节点]的使用率(已使用的存储/总的存储) final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type continue; } final long capacity = getCapacity(r, t); final double utilizationDiff = utilization - policy.getAvgUtilization(t); //thresholdDiff越小说明该datanode越理想 final double thresholdDiff = Math.abs(utilizationDiff) - threshold; final long maxSize2Move = computeMaxSize2Move(capacity, getRemaining(r, t), utilizationDiff, threshold); final StorageGroup g; if (utilizationDiff > 0) { final Source s = dn.addSource(t, maxSize2Move, dispatcher); if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { overLoadedBytes += precentage2bytes(thresholdDiff, capacity); overUtilized.add(s); } g = s; } else { g = dn.addTarget(t, maxSize2Move); if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { underLoadedBytes += precentage2bytes(thresholdDiff, capacity); underUtilized.add(g); } } dispatcher.getStorageGroupMap().put(g); } } logUtilizationCollections(); Preconditions.checkState(dispatcher.getStorageGroupMap().size() == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced return Math.max(overLoadedBytes, underLoadedBytes); }

选择proxy:

private boolean chooseProxySource() { final DatanodeInfo targetDN = target.getDatanodeInfo(); // if source and target are same nodes then no need of proxy if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) { return true; } // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { for (StorageGroup loc : block.getLocations()) { if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } } // check if there is replica which is on the same rack with the target //优选选择副本所在机器跟target在相同rack的dn作为proxy,同时需要改dn当前处理balance[发送block]的线程数量是否超标 for (StorageGroup loc : block.getLocations()) { if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica //如果以上都没有选择出合适的proxy,那么就选一个不忙的dn作为proxy for (StorageGroup loc : block.getLocations()) { if (addTo(loc)) { return true; } } return false; }

block 迁移:

step1:balancer socket连接target,发起replaceBlock 请求,请求target从proxy上复制一个block副本到本地来替换掉source上的副本。

step2:target向proxy 发起copyBlock请求,从proxy上将block副本复制到本地,复制完成后 target 通过notifyNamenodeReceivedBlock 方法生成一个ReceivedDeletedBlockInfo对象并缓存在队列,下一次发起心跳的时候会据此对象通知namenode 将target上新加的block副本存入blockmap,并将source上对应的block 副本删除

private void dispatch() { if (LOG.isDebugEnabled()) { LOG.debug("Start moving " + this); } Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { //balaner建立跟target的连接 sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); final KeyManager km = nnc.getKeyManager(); Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, unbufIn, km, accessToken, target.getDatanodeInfo()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); //向target发请求,命令其复制block副本 sendRequest(out, eb, accessToken); receiveResponse(in); nnc.getBytesMoved().addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); target.getDDatanode().setHasFailure(); // Proxy or target may have some issues, delay before using these nodes // further in order to avoid a potential storm of "threads quota // exceeded" warnings when the dispatcher gets out of sync with work // going on in datanodes. //迁移失败,可能是因为proxy、target当前过于繁忙(同时处理blockReplace的操作太多),所以延迟其参与balance proxySource.activateDelay(delayAfterErrors); target.getDDatanode().activateDelay(delayAfterErrors); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); //不管迁移成功还是失败,都将当前block从队列中删除 proxySource.removePendingBlock(this); target.getDDatanode().removePendingBlock(this); synchronized (this) { reset(); } synchronized (Dispatcher.this) { Dispatcher.this.notifyAll(); } } } private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token<BlockTokenIdentifier> accessToken) throws IOException { new Sender(out).replaceBlock(eb, target.storageType, accessToken, source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); //第4个参数是source 的uuid(在name上唯一标示一个datanode),用于通知namenode删source上的副本 }

target复制完成后的日志:

2019-07-03 10:15:41,319 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Moved BP-691805646-10.116.100.3-1498296722300:blk_2306375028_1234536481 from /10.116.100.149:36907, delHint=7a29645c-cc12-44ce-b9c3-6642a82317c4

常见异常:

target异常: WARN balancer.Dispatcher: Failed to move blk_10818540065_9759022625 with size=12178 from 10.116.100.126:50010:DISK to 10.116.100.143:50010:DISK through 10.116.102.93:50010: Got error, status message Not able to receive block 10818540065 from /10.116.100.149:57891 because threads quota is exceeded., block move is failed proxy异常: WARN balancer.Dispatcher: Failed to move blk_13904085291_12851796248 with size=412 from 10.116.101.126:50010:DISK to 10.116.101.227:50010:DISK through 10.116.100.51:50010: Got error, status message opReplaceBlock BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 received exception java.io.IOException: Got error, status message Not able to copy block 13904085291 to /10.116.101.227:42066 because threads quota is exceeded., copy block BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 from /10.116.100.51:50010, block move is failed

threads quota is exceeded 说明datanode上(target、proxy)上当前正在参与balance进行blockReplace(target)和blockCopy(proxy)的线程数量超过阀值(dfs.datanode.balance.max.concurrent.moves),所以block迁移失败。 

如果大量出现此类异常,那么balance的速度会很慢:

    直接原因:block迁移失败;

    间接原因:target和proxy会进入禁用期,导致可选proxy减少,进而src_block中的block不能及时被消费,也不能拉取新的blcok

balance慢的解决办法:
1、将datanode的迁移线程数dfs.datanode.balance.max.concurrent.moves增大。增大之后threads quota is exceeded 的问题会缓解,balance速度会加快。修改此参数需重启datanode。
2、将balancer的dfs.datanode.balance.max.concurrent.moves 增大。balancer上此值应该比datanode上的值稍微小一点,因为两个股进程存在状态不同步的可能。
3、将-threshold 增大。增大之后可以优先迁移使用率最高的datanode,待使用率将下来之后,再将threshold降低。
4、如果是多namespace的情况,则可以将-policy 由默认datanode改为Pool,在迁移时应根据namespace对应blockpool的使用率来评估datanode是否要balance。

 

原文链接:https://my.oschina.net/u/3987818/blog/3086282
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章