HBase实操:Spark-Read-HBase-Snapshot-Demo 分享
前言:之前给大家分享了Spark通过接口直接读取HBase的一个小demo:HBase-Spark-Read-Demo,但如果在数据量非常大的情况下,Spark直接扫描HBase表必然会对HBase集群造成不小的压力。基于此,今天再给大家分享一下Spark通过Snapshot直接读取HBase HFile文件的方式。
首先我们先创建一个HBase表:test,并插入几条数据,如下:
hbase(main):003:0> scan 'test' ROW COLUMN+CELL r1 column=f:name, timestamp=1583318512414, value=zpb r2 column=f:name, timestamp=1583318517079, value=lisi r3 column=f:name, timestamp=1583318520839, value=wang
接着,我们创建该HBase表的快照,其在HDFS上路径如下:
hbase(main):005:0> snapshot 'test', 'test-snapshot' 0 row(s) in 0.3690 seconds $ hdfs dfs -ls /apps/hbase/data/.hbase-snapshot Found 1 items drwxr-xr-x - hbase hdfs 0 2020-03-21 21:24 /apps/hbase/data/.hbase-snapshot/test-snapshot
代码如下:
import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat} import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.{SparkConf, SparkContext} object SparkReadHBaseSnapshotDemo { // 主函数 def main(args: Array[String]) { // 设置spark访问入口 val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setMaster("local")//调试 val sc = new SparkContext(conf) // 获取HbaseRDD val job = Job.getInstance(getHbaseConf()) TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp")) val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaseRDD.map(_._2).map(getRes(_)).count() } def getRes(result: org.apache.hadoop.hbase.client.Result): String = { val rowkey = Bytes.toString(result.getRow()) val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes)) println(rowkey+"---"+name) name } // 构造 Hbase 配置信息 def getHbaseConf(): Configuration = { val conf: Configuration = HBaseConfiguration.create() conf.set(TableInputFormat.SCAN, getScanStr()) conf } // 获取扫描器 def getScanStr(): String = { val scan = new Scan() // scan.set.... 各种过滤 val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray()) } }
注:上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。否则,应在代码中进行配置,代码如下:
package com.xcar.etl import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat} import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.{SparkConf, SparkContext} object SparkReadHBaseSnapshotDemo2 { val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn" // 主函数 def main(args: Array[String]) { // 设置spark访问入口 val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo2") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setMaster("local")//调试 val sc = new SparkContext(conf) // 获取HbaseRDD val job = Job.getInstance(getHbaseConf()) TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp")) val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaseRDD.map(_._2).map(getRes(_)).count() } def getRes(result: org.apache.hadoop.hbase.client.Result): String = { val rowkey = Bytes.toString(result.getRow()) val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes)) println(rowkey+"---"+name) name } // 构造 Hbase 配置信息 def getHbaseConf(): Configuration = { val conf: Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent", "/hbase") conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM) conf.set("hbase.rootdir", "/apps/hbase") // 设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "test") conf.set("fs.defaultFS","hdfs://xxxxxx:8020") conf.set(TableInputFormat.SCAN, getScanStr()) conf } // 获取扫描器 def getScanStr(): String = { val scan = new Scan() // scan.set.... 各种过滤 val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray()) } }
TableSnapshotInputFormat.setInput 方法参数解析:
public static void setInput(org.apache.hadoop.mapreduce.Job job, String snapshotName, org.apache.hadoop.fs.Path restoreDir) throws IOException 参数解析: job - the job to configure snapshotName - the name of the snapshot to read from restoreDir - a temporary directory to restore the snapshot into. Current user should have write permissions to this directory, and this should not be a subdirectory of rootdir. After the job is finished, restoreDir can be deleted.
项目用到的 pom.xml 文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zpb.test</groupId> <artifactId>spark-read-hbase-snapshot-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>spark-read-hbase-snapshot-demo</name> <url>http://maven.apache.org</url> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <cdh.hbase.version>1.2.0-cdh5.7.0</cdh.hbase.version> <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${cdh.spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${cdh.hbase.version}</version> </dependency> </dependencies> </project>
欢迎关注本人公众号【HBase工作笔记】,解锁更多姿势~
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
全方位认识HBase:一个值得拥有的NoSQL数据库(一)
前言:说起HBase这门技术,在认知上对于稍微接触或使用过它的人来讲,可能只是百千数据库中一个很普通的库,大概就像我对Redis的认知一样:缓存嘛!可对于HBase,我确实是带着某些感情在的。今日突然萌生了一个生趣的想法,想抛开技术的视角,从情感的角度,像写小说一样,写写这位老朋友,这可能会有点滑稽吧,不过我觉得很放松。《全方位认识HBase:一个值得拥有的NoSQL数据库》:从今天起,我们就暂且认为这是一本小说的名字吧!哈哈~ 其实我特别想做的一件事情,就是想让更多的人来认识并使用HBase这门地地道道的大数据栈技术,当然不为别的,主要原因还是HBase真的很棒很热,自己用着感觉真的好,不好的产品我怎么会推荐给你呢?毕竟HBase这家伙不会给我一分钱的广告费~ 那首先,我想给大家分享的内容就是:在我刚接触HBase这位老朋友的时候根本不想去看的一些觉得没用的东西。什么呢?其实就是特别无聊又深奥的好像还不得不问的灵魂三问:我是谁?我从哪里来?我要到哪里去? 为什么想写写这个呢?真的好无聊啊~ 当然肯定不是我太无聊了,说实话,是因为对它真的有感情了,所以就想把它的前世今生全都介绍给你,可...
- 下一篇
基于HBase构建千亿级文本数据相似度计算与快速去重系统
前言 随着大数据时代的到来,数据信息在给我们生活带来便利的同时,同样也给我们带来了一系列的考验与挑战。本文主要介绍了基于 Apache HBase 与 Google SimHash 等多种算法共同实现的一套支持百亿级文本数据相似度计算与快速去重系统的设计与实现。该方案在公司业务层面彻底解决了多主题海量文本数据所面临的存储与计算慢的问题。 一. 面临的问题 1. 如何选择文本的相似度计算或去重算法? 常见的有余弦夹角算法、欧式距离、Jaccard 相似度、最长公共子串、编辑距离等。这些算法对于待比较的文本数据不多时还比较好用,但在海量数据背景下,如果每天产生的数据以千万计算,我们如何对于这些海量千万级的数据进行高效的合并去重和相似度计算呢? 2. 如何实现快速计算文本相似度或去重呢? 如果我们选好了相似度计算和去重的相关算法,那我们怎么去做呢?如果待比较的文本数据少,我们简单遍历所有文本进行比较即可,那对于巨大的数据集我们该怎么办呢?遍历很明显是不可取的。 3. 海量数据的存储与快速读写 二. SimHash 算法引入 基于问题一,我们引入了 SimHash 算法来实现海量文本的相似度计...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS关闭SELinux安全模块
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- 设置Eclipse缩进为4个空格,增强代码规范
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7安装Docker,走上虚拟化容器引擎之路