Spark拉取Kafka的流数据,转插入HBase中
Spark拉取Kafka的流数据,转插入HBase中
pom.xml文件样例
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yys.spark</groupId> <artifactId>spark</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.12</scala.version> <kafka.version>0.9.0.1</kafka.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.5</hadoop.version> <hbase.version>1.4.0</hbase.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Kafka 依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <!-- Hadoop 依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase 依赖--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <!-- Spark Streaming 依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming整合Flume 依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <!-- Spark SQL 依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.8.0</version> </dependency> </dependencies> <build> <!-- <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> --> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
scala代码:
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.HTableDescriptor import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.util.Bytes import org.apache.spark._ import org.apache.spark.SparkContext import org.apache.hadoop.hbase.client.Get import org.apache.spark.serializer.KryoSerializer import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.hadoop.hbase.util.Bytes //拉取kafka的数据流,转插入hbase中 object Kafka2Hbase { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val sc = new SparkContext(sparkConf) //hbase中的表名称 var table_name = "create_table_at_first" val conf = HBaseConfiguration.create() //hbase的配置信息可以从/home/hadoop/HBase/hbase/conf/hbase-site.xml得到 conf.set("hbase.rootdir", "hdfs://master:9000/hbase_db") conf.set("hbase.zookeeper.quorum", "master,Slave1,Slave2") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.master", "60000") conf.set(TableInputFormat.INPUT_TABLE, table_name) //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的! val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, table_name) val indataRDD = sc.makeRDD(Array("1,jack15,15", "2,mike16,16")) val rdd = indataRDD.map(_.split(',')).map { arr => { /*一个Put对象就是一行记录,在构造方法中指定主键 * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换 * Put.add方法接收三个参数:列族,列名,数据 * myfamily:为列族名 */ val put = new Put(Bytes.toBytes(arr(0).toInt)) put.add(Bytes.toBytes("myfamily"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) put.add(Bytes.toBytes("myfamily"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt)) //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset (new ImmutableBytesWritable, put) } } rdd.saveAsHadoopDataset(jobConf) sc.stop() //之后在hbase中,可以get 'create_table_at_first','jack15','myfamily' 查询这条数据即可 } }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
centos7 hive 单机模式安装配置
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80232813 我的原创地址:https://dongkelun.com/2018/03/24/hiveConf/ 前言: 由于只是在自己的虚拟机上进行学习,所以对hive只是进行最简单的配置,其他复杂的配置文件没有配置。 1、前提 1.1 安装配置jdk1.8 1.2 安装hadoop2.x hadoop单机模式安装见:centos7 hadoop 单机模式安装配置 1.3 安装mysql并配置myql允许远程访问,我的mysql版本5.7.18。 mysql数据库安装过程请参考:Centos 7.2 安装 Mysql 5.7.13 2、下载hive 下载地址:http://mirror.bit.edu.cn/apache/hive/,我下载的是apache-hive-2.3.2-bin.tar.gz。 wget http://mirror.bit.edu.cn/ap...
- 下一篇
centos7 hadoop 集群安装配置
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80234427 我的原创地址:https://dongkelun.com/2018/04/05/hadoopClusterConf/ 前言: 本文安装配置的hadoop为分布式的集群,单机配置见:centos7 hadoop 单机模式安装配置 我用的三个centos7, 先将常用环境配置好(CentOS 初始环境配置),设置的ip分别为:192.168.44.138、192.168.44.139,192.168.44.140,分别对应别名master、slave1、slave2 1、首先安装配置jdk(我安装的1.8) 2、给每个虚拟机的ip起个别名 在每个虚拟机上执行 vim /etc/hosts 在最下面添加: 192.168.44.138 master 192.168.44.139 slave1 192.168.44.140 slave2 在每个虚拟机上ping一...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS关闭SELinux安全模块
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7设置SWAP分区,小内存服务器的救世主