Spark(九) -- SparkSQL API编程
本文测试的Spark版本是1.3.1
Text文本文件测试
一个简单的person.txt文件内容为:
JChubby,13 Looky,14 LL,15
分别是Name和Age
在Idea中新建Object,原始代码如下:
object TextFile{ def main(args:Array[String]){ } }
SparkSQL编程模型:
第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext
第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型
第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程
第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame
第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等
2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作
3.将数据注册成表,通过SQL语句操作
object TextFile{ def main(args:Array[String]){ //第一步 //构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了 val sc = new SparkContext() //构建SQLContext对象 val sqlContext = new SQLContext(sc) //第二步 import sqlContext.implicits._ //第三步 case Person(name:String,age:Int) //第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame val people = sc.textFile("文件路径").map(_.split(",")).map{case (name,age) => Person(name,age.toInt)}.toDF() //第五步 //DataFrame方法 println("------------------------DataFrame------------------------------------") //赛选出age>10的记录,然后只选择name属性,show方法将其输出 people.where(people("age") > 10).select(people("name")).show() //DSL println("---------------------------DSL---------------------------------") people.where('age > 10).select('name).show() //SQL println("-----------------------------SQL-------------------------------") //将people注册成people表 people.registerTempTable("people") //使用sqlContext的sql方法来写SQL语句 //查询返回的是RDD,所以对其进行collect操作,之后循环打印 sqlContext.sql("select name from people where age > 10").collect.foreach(println) //保存为parquet文件,之后的parquet演示会用到 people.saveAsParquet("保存的路径") } }
parquet格式文件测试:
val sc = new SparkContext() val sql = new SQLContext(sc) import sql.implicits._ val parquet = sql.parquetFile(args(0)) println("------------------------DataFrame------------------------------------") println(parquet.where(parquet("age") > 10).select(parquet("name")).show()) println("---------------------------DSL---------------------------------") println(parquet.where('age > 10).select('name).show()) println("-----------------------------SQL-------------------------------") parquet.registerTempTable("parquet") sql.sql("select name from parquet where age > 10").map(p => "name:" + p(0)).collect().foreach(println)
Json格式测试:
val sc = new SparkContext() val sql = new SQLContext(sc) import sql.implicits._ val json = sql.jsonFile(args(0)) println("------------------------DataFrame------------------------------------") println(json.where(json("age") > 10).select(json("name")).show()) println("---------------------------DSL---------------------------------") println(json.where('age > 10).select('name).show()) println("-----------------------------SQL-------------------------------") json.registerTempTable("json") sql.sql("select name from json where age > 10").map(p => "name:" + p(0)).collect().foreach(println)
可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作
以上为SparkSQL API的简单使用

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spark(八) -- 使用Intellij Idea搭建Spark开发环境
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45954731 Intellij Idea下载地址: 官方下载 选择右下角的Community Edition版本下载安装即可 本文中使用的是windows系统 环境为: jdk1.6.0_45 scala2.10.5 在网上下载jdk和scala的安装包双击运行安装即可 注意:如果之后要将scala文件打包成jar包并在spark集群上运行的话,请确保spark集群和打包操作所在机器 环境保持一致!不然运行jar包会出现很多异常 要使用idea开发spark程序首先要安装scala插件 进入idea主界面 在线安装: 选择Plugins 输入scala 选择安装即可 离线安装: scala离线插件包 下载完scala插件包之后,在idea主界面的Plugins中选择从本地安装 选择下载好的scala插件安装即可 本文使用的是Idea14.1.3 对应的scala插件版本为1.5 不同版本的Idea对应的scala插件版本可能不同...
- 下一篇
Spark(十) -- Spark Streaming API编程
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45973451 本文测试的Spark版本是1.3.1 Spark Streaming编程模型: 第一步: 需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数: 1、SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息 2、Seconds对象:该对象设置了StreamingContext多久读取一次数据流 第二步: 构建好入口对象之后,直接调用该入口的方法读取各种不同方式传输过来的数据流,如:Socket,HDFS等方式。并会将数据转换成DStream对象进行统一操作 第三步: DStream本身是一种RDD序列,Streaming接受数据流之后会进行切片,每个片都是一个RDD,而这些RDD最后都会包装到一个DStream对象中统一操作。在这个步骤中,进行对数据的业务处理 第四步:...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS关闭SELinux安全模块
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Red5直播服务器,属于Java语言的直播服务器
- Windows10,CentOS7,CentOS8安装Nodejs环境