Apache Carbondata接入Kafka实时流数据
1.导入carbondata依赖的jar包
将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入$SPARKHOME/jars;或将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入在$SPARKHOME创建的carbondlib目录
2.导入kafka依赖的jar包
接入kafka数据需要依赖kafka的jars,将以下jars导入$SPARKHOME/jars
kafka-clients-0.10.0.1.jar
spark-sql-kafka-0-10_2.11-2.3.2.jar
3.spark-shell启动服务
./bin/spark-shell --master spark://hostname:7077 --jars apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar
a).导入依赖
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._
b).创建session
启动第一个目录是数据存储目录,第二个目录是元数据目录;都可以是hdfs目录
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("/home/bigdata/carbondata/data","/home/bigdata/carbondata/carbon.metastore")
c).创建source表
carbon.sql( s""" | CREATE TABLE IF NOT EXISTS kafka_json_source( | id STRING, | name STRING, | age INT, | brithday TIMESTAMP) | STORED AS carbondata | TBLPROPERTIES( | 'streaming'='source', | 'format'='kafka', | 'kafka.bootstrap.servers'='hostname:9092', | 'subscribe'='kafka_json', | 'record_format'='json', | 'comment'='get kafka data') """.stripMargin).show()
d).创建sink表
carbon.sql( s""" | CREATE TABLE IF NOT EXISTS kafka_json_sink( | id STRING, | name STRING, | age INT, | brithday TIMESTAMP) | STORED AS carbondata | TBLPROPERTIES( | 'streaming'='sink') """.stripMargin).show()
e).创建job任务
carbon.sql( s""" | CREATE STREAM kafka_json_job ON TABLE kafka_json_sink( | STMPROPERTIES( | 'trigger'='ProcessingTime', | 'interval'='10 seconds') | AS SELECT * FROM kafka_json_source """.stripMargin).show()
f).创建DATAMAP
carbon.sql( s""" | CREATE DATAMAP agg_kafka_json_sink | ON TABLE kafka_json_sink( | USING "preaggregate" | AS | SELECT id,name,sum(age),max(age),min(age),avg(age) | FROM kafka_json_sink | GRPUP BY id,name """.stripMargin).show()
4.常用SQL命令
a).导入本地数据
carbon.sql("LOAD DATA INPATH '/home/bigdata/carbondata/sample.csv' INTO TABLE kafka_json_source").show()
b).查看表结构
carbon.sql("DESC kafka_json_source").show()
c).查看表数据
carbon.sql("SELECT * FROM kafka_json_source WHERE id=1").show()
d).清理表数据
carbon.sql("TRUNCATE TABLE kafka_json_sink").show()
e).删除表
carbon.sql("DROP TABLE IF EXISTS kafka_json_source").show()
f).查看job任务状态
carbon.sql("SHOW STREAMS ON TABLE kafka_json_sink").show()
g).删除job任务
carbon.sql("DROP STREAM kafka_json_job").show()
h).查询DATAMAP表信息
carbon.sql("DESC agg_kafka_json_sink_kafka_json_sink").show()
i).查询表Segments信息
carbon.sql("SHOW SEGMENTS FOR TABLE kafka_json_sink").show()
j).条件查询
carbon.sql("SELECT * FROM kafka_json_sink WHERE agent_id=499 AND signature=''").show()
k).聚合查询
carbon.sql("SELECT agent_id,signature,method_type,sum(elapse_time),max(elapse_time),min(elapse_time) FROM kafka_json_sink GROUP BY agent_id,signature,method_type").show()
5.注意事项
a).kafka使用配置
由于Carbondata的kafka-consumer反序列化配置如下,所以在kafka-producer应该使用对于配置,否则无法解析数据
key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于Knox登录Yarn UI查看SparkStreaming作业兼容性问题说明
问题背景 1.登录EMR集群节点,运行SparkStreaming示例,如下所示(不同版本EMR集群spark-examples_xxx.jar的路径略有差异): [root@emr-header-1 ~]# spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --master yarn --deploy-mode cluster /opt/apps/ecm/service/spark/2.3.2-1.0.2/package/spark-2.3.2-1.0.2-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.2.jar 192.168.0.211 9999 参数说明:192.168.0.211: 监听流数据源地址9999: 监听流数据源端口 2.提交作业后,通过EMR集群提供的Yarn UI来查看运行在yarn上面作业。 3.点击“ApplicationMaster”,跳转spark界面查看作业详情,切换到“Streaming”可以看到...
- 下一篇
Spark on Kubernetes 的现状与挑战
云原生时代,Kubernetes 的重要性日益凸显,这篇文章以 Spark 为例来看一下大数据生态 on Kubernetes 生态的现状与挑战。 1. Standalone 模式 Spark 运行在 Kubernetes 集群上的第一种可行方式是将 Spark 以 Standalone 模式运行,但是很快社区就提出使用 Kubernetes 原生 Scheduler 的运行模式,也就是 Native 的模式。关于 Standalone 模式这里就没有继续讨论的必要了。 2. Kubernetes Native 模式 Native 模式简而言之就是将 Driver 和 Executor Pod 化,用户将之前向 YARN 提交 Spark 作业的方式提交给 Kubernetes 的 apiserver,提交命令如下: $ bin/spark
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长