您现在的位置是:首页 > 文章详情

Apache Carbondata接入Kafka实时流数据

日期:2019-08-01点击:629

1.导入carbondata依赖的jar包

apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入$SPARKHOME/jars;或将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入在$SPARKHOME创建的carbondlib目录

2.导入kafka依赖的jar包

接入kafka数据需要依赖kafka的jars,将以下jars导入$SPARKHOME/jars

kafka-clients-0.10.0.1.jar
spark-sql-kafka-0-10_2.11-2.3.2.jar

3.spark-shell启动服务

./bin/spark-shell --master spark://hostname:7077 --jars apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar

a).导入依赖

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._

b).创建session

启动第一个目录是数据存储目录,第二个目录是元数据目录;都可以是hdfs目录

val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("/home/bigdata/carbondata/data","/home/bigdata/carbondata/carbon.metastore")

c).创建source表

carbon.sql( s""" | CREATE TABLE IF NOT EXISTS kafka_json_source( | id STRING, | name STRING, | age INT, | brithday TIMESTAMP) | STORED AS carbondata | TBLPROPERTIES( | 'streaming'='source', | 'format'='kafka', | 'kafka.bootstrap.servers'='hostname:9092', | 'subscribe'='kafka_json', | 'record_format'='json', | 'comment'='get kafka data') """.stripMargin).show()

d).创建sink表

carbon.sql( s""" | CREATE TABLE IF NOT EXISTS kafka_json_sink( | id STRING, | name STRING, | age INT, | brithday TIMESTAMP) | STORED AS carbondata | TBLPROPERTIES( | 'streaming'='sink') """.stripMargin).show()

e).创建job任务

carbon.sql( s""" | CREATE STREAM kafka_json_job ON TABLE kafka_json_sink( | STMPROPERTIES( | 'trigger'='ProcessingTime', | 'interval'='10 seconds') | AS SELECT * FROM kafka_json_source """.stripMargin).show()

f).创建DATAMAP

carbon.sql( s""" | CREATE DATAMAP agg_kafka_json_sink | ON TABLE kafka_json_sink( | USING "preaggregate" | AS | SELECT id,name,sum(age),max(age),min(age),avg(age) | FROM kafka_json_sink | GRPUP BY id,name """.stripMargin).show()

4.常用SQL命令

a).导入本地数据

carbon.sql("LOAD DATA INPATH '/home/bigdata/carbondata/sample.csv' INTO TABLE kafka_json_source").show()

b).查看表结构

carbon.sql("DESC kafka_json_source").show()

c).查看表数据

carbon.sql("SELECT * FROM kafka_json_source WHERE id=1").show()

d).清理表数据

carbon.sql("TRUNCATE TABLE kafka_json_sink").show()

e).删除表

carbon.sql("DROP TABLE IF EXISTS kafka_json_source").show()

f).查看job任务状态

carbon.sql("SHOW STREAMS ON TABLE kafka_json_sink").show()

g).删除job任务

carbon.sql("DROP STREAM kafka_json_job").show()

h).查询DATAMAP表信息

carbon.sql("DESC agg_kafka_json_sink_kafka_json_sink").show()

i).查询表Segments信息

carbon.sql("SHOW SEGMENTS FOR TABLE kafka_json_sink").show()

j).条件查询

carbon.sql("SELECT * FROM kafka_json_sink WHERE agent_id=499 AND signature=''").show()

k).聚合查询

carbon.sql("SELECT agent_id,signature,method_type,sum(elapse_time),max(elapse_time),min(elapse_time) FROM kafka_json_sink GROUP BY agent_id,signature,method_type").show()

5.注意事项

a).kafka使用配置

由于Carbondata的kafka-consumer反序列化配置如下,所以在kafka-producer应该使用对于配置,否则无法解析数据

key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
原文链接:https://yq.aliyun.com/articles/712233
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章