您现在的位置是:首页 > 文章详情

使用SparkSql 读取ES数据

日期:2020-03-06点击:989
  1. 对于大批量数据,查询es时,需要带条件去查询,否则一下查出所有数据数据量会很大
  2. es查询需要编写json格式的DSL查询语句,对于复杂查询,DSL编写起来也分很复杂,所以我们这里使用sparksql,通过编写sql语句,spark将sql语句自动转化为DSL语句来查询es。
    关于es对spark的支持可参见文档,官网 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

废话不多说,上代码
case class people(phone:String)
val sparkConf = new SparkConf().setAppName("application-name").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
import sql.implicits._
//配置
val options = Map(

 "pushdown" -> "true", "es.nodes" -> "192.45.15.15", //ip "es.port" -> "9200") //端口

val esDF = sql.read.format("org.elasticsearch.spark.sql")

 .options(options) .load("spark_202003/alldata") //索引 // esDF.printSchema() //可打印schema信息

val table = esDF.createOrReplaceTempView("spark_202003") //创建临时表用于查询

val sqlString=“select * from spark_202003 where code=‘1’ ”

val resultDF = sql.sql(sqlString)
var resultDS = resultDF.map(x => people(x(0).toString))) //
这里是转换成了dataset,之后可对dataset 做其他操作
打印下查询的内容
resultDS.collect().toList.toString()

原文链接:https://yq.aliyun.com/articles/748402
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章