您现在的位置是:首页 > 文章详情

HBase2.0 replication wal znode大量积压问题定位解决

日期:2020-04-06点击:685

现象

线上有2个集群A和B,配置了双向同步,单活,即业务层某一时刻只会访问其中一个集群;
近期A集群的regionserver日志中报了很多异常,但监控页面正常,功能也未受影响。

HBase版本为2.0.0;

2019-09-04 02:44:58,115 WARN org.apache.zookeeper.ClientCnxn: Session 0x36abf38cec5531d for server host-15/ip-15:2181, unexpected error, closing socket connection and attempting reconnect java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:117) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1073)

定位

1、查看前后日志,可以看出该异常与replication有关

2019-09-04 02:44:56,960 INFO org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl: Atomically moving host-17,16020,1561539485645/1's WALs to my queue 2019-09-04 02:44:58,115 WARN org.apache.zookeeper.ClientCnxn: Session 0x36abf38cec5531d for server host-15/ip-15:2181, unexpected error, closing socket connection and attempting reconnect java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:117) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1073) ... 中间多次Broken pipe的异常日志 ... 2019-09-04 02:45:45,544 ERROR org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper: ZooKeeper multi failed after 4 attempts 2019-09-04 02:45:45,544 WARN org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl: Got exception in copyQueuesFromRSUsingMulti: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:944) at org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:924) at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.multi(RecoverableZooKeeper.java:663) at org.apache.hadoop.hbase.zookeeper.ZKUtil.multiOrSequential(ZKUtil.java:1670) at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.moveQueueUsingMulti(ReplicationQueuesZKImpl.java:318) at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.claimQueue(ReplicationQueuesZKImpl.java:210) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager$NodeFailoverWorker.run(ReplicationSourceManager.java:686) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

2、为什么会有这个操作

HBase的replication处理机制是,由各个regionServer负责自己产生的wal文件,如果某个rs退出,则其它rs会认领该rs尚未完成的wal文件同步任务,上面的日志就是认领动作产生的日志。

3、为什么会有rs退出

在cdh中查看命令运行日志,最近的一次重启是8月16,应该是当时修改了集群参数;
查看日志中最早出现异常的时间,也是8月16。

4、为什么认领失败

查看zk日志,有大量如下警告信息

2019-09-04 02:45:44,270 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x36abf38cec5531a due to java.io.IOException: Len error 4448969

这个错是zk的一个bug,会在单次请求涉及过多数据量的时候触发,修复版本是3.4.7, 3.5.2, 3.6.0,我们的线上版本是3.4.5,相关的issue:
https://issues.apache.org/jira/browse/ZOOKEEPER-706

但这只是直接原因,即使修复这个bug,也只是将问题暂时掩盖,根本原因还需要继续分析。

5、为什么认领时涉及大量数据

rs在认领任务时,对于每个wal文件会生成一个OP对象,然后封装到一个List中,最后调用multi执行

 private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) { try { // hbase/replication/rs/deadrs String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode); List<ZKUtilOp> listOfOps = new ArrayList<>(); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); String newPeerId = peerId + "-" + znode; String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId); // check the logs queue for the old peer cluster String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId); List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); if (!peerExists(replicationQueueInfo.getPeerId())) { LOG.warn("Peer " + replicationQueueInfo.getPeerId() + " didn't exist, will move its queue to avoid the failure of multi op"); for (String wal : wals) { String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); } listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); return null; } SortedSet<String> logQueue = new TreeSet<>(); if (wals == null || wals.isEmpty()) { listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); } else { // create the new cluster znode ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); listOfOps.add(op); // get the offset of the logs and set it to new znodes for (String wal : wals) { String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal); listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); logQueue.add(wal); } // add delete op for peer listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size()); } ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue"); return new Pair<>(newPeerId, logQueue); } catch (KeeperException e) { // Multi call failed; it looks like some other regionserver took away the logs. LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); } catch (InterruptedException e) { LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); Thread.currentThread().interrupt(); } return null; }

这意味着,退出的rs遗留了大量的wal同步任务,查看zk上该rs的任务列表,路径是/hbase/replication/rs/#rs实例名#/#peerId#;
结果打印出茫茫多的子节点,10秒钟都没打印结束;
具体数量不好统计,但从cdh中可以看到集群的zk有30多万个znode,作为参考,马驹桥集群的zk只有1.2万个znode。

6、为什么会出现这么多znode

查看replication模块的源码,梳理核心流程如下:
replication

walReader和walShipper的实现类分别为ReplicationSourceWALReader和ReplicationSourceShipper,各自起了一个线程,以生产者消费者方式通过一个队列通信;
删除wal znode是由shipper负责的,实际逻辑是在shipEdits方法中,在数据发送结束后执行,可确保数据不丢失,删除时比该wal更旧的wal对应的znode会一起删除;
由于积压了很多znode,猜测该shipEdits方法应该是长时间没有被调用,查看这2个线程的栈信息:

"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2.replicationSource.wal-reader.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157238 daemon prio=5 os_prio=0 tid=0x00007f7634be8800 nid=0x377ef waiting on condition [0x00007f6114c0e000]"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2.replicationSource.wal-reader.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157238 daemon prio=5 os_prio=0 tid=0x00007f7634be8800 nid=0x377ef waiting on condition [0x00007f6114c0e000]   java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.handleEmptyWALEntryBatch(ReplicationSourceWALReader.java:192) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.run(ReplicationSourceWALReader.java:142)

"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157237 daemon prio=5 os_prio=0 tid=0x00007f76350b0000 nid=0x377ee waiting on condition [0x00007f6108173000]   java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  <0x00007f6f99bb6718> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.take(ReplicationSourceWALReader.java:248) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper.run(ReplicationSourceShipper.java:108)

从栈信息可以看出,shipper确实处于阻塞状态;

查看walReader的相关代码,写入队列的部分如下:

WALEntryBatch batch = readWALEntries(entryStream); if (batch != null && batch.getNbEntries() > 0) { if (LOG.isTraceEnabled()) { LOG.trace(String.format("Read %s WAL entries eligible for replication", batch.getNbEntries())); } entryBatchQueue.put(batch); sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath()); }

推测是一直进入了else逻辑,handleEmptyWALEntryBatch方法代码如下:

protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); Thread.sleep(sleepForRetries); }

打开该类的trace级别log,可以看到不停的打印如下日志:

2019-09-10 17:09:34,093 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL 2019-09-10 17:09:35,096 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL 2019-09-10 17:09:36,099 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL 2019-09-10 17:09:37,102 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL 2019-09-10 17:09:38,105 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL 2019-09-10 17:09:39,108 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL

同时,并没有类似"WAL entries eligible for replication"的日志出现;
至此,可以确认是walReader读取时一直返回了empty的batch对象导致;
wal里面有数据,但是返回为空的原因是,被过滤掉了,readWALEntries方法的代码如下:

 private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException { WALEntryBatch batch = null; while (entryStream.hasNext()) { if (batch == null) { batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); } Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { long entrySize = getEntrySize(entry); batch.addEntry(entry); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySize); // Stop if too many entries or too big if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity) { break; } } } } return batch; }

这里起作用的是ClusterMarkingEntryFilter,HBase为了避免master-master模式下出现replication环路,会在同步的walEntry中写入源集群的clusterId,如果当前处理的entry中的clusterId与目标集群相同,说明它是从目标集群同步过来的,就不用再同步回去了,代码如下;

 public Entry filter(Entry entry) { // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { WALEdit edit = entry.getEdit(); WALKeyImpl logKey = (WALKeyImpl)entry.getKey(); if (edit != null && !edit.isEmpty()) { // Mark that the current cluster has the change logKey.addClusterId(clusterId); // We need to set the CC to null else it will be compressed when sent to the sink entry.setCompressionContext(null); return entry; } } return null; }

至此,原因基本清楚,线上的2个HBase集群配置为master-master模式的双向同步,但是只有1个作为active集群会写入数据,而作为backup的集群接受同步数据时会产生wal,但一直没有写入操作就一直触发不了shipper的删除动作。

解决

在读取的batch为empty时依然更新其lastWalPosition,并写入带有到队列中,以触发shipper的清理动作;

这个bug存在于2.0分支,2.1之后这部分代码在实现serial replication时做了改动,问题已经不存在;

对于2.0分支,已提了一个issue(https://issues.apache.org/jira/browse/HBASE-23008)到社区,但由于提交pr时该分支已停止维护,因此并未合并进去;

原文链接:https://yq.aliyun.com/articles/753840
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章