版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991
本文测试的Spark版本是1.3.1
Text文本文件测试
一个简单的person.txt文件内容为:
JChubby,13
Looky,14
LL,15
分别是Name和Age
在Idea中新建Object,原始代码如下:
object TextFile{
def main(args:Array[String]){
}
}
SparkSQL编程模型:
第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext
第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型
第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程
第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame
第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等
2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作
3.将数据注册成表,通过SQL语句操作
object TextFile{
def main(args:Array[String]){
val sc = new SparkContext()
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
case Person(name:String,age:Int)
val people = sc.textFile("文件路径").map(_.split(",")).map{case (name,age) => Person(name,age.toInt)}.toDF()
println("------------------------DataFrame------------------------------------")
people.where(people("age") > 10).select(people("name")).show()
println("---------------------------DSL---------------------------------")
people.where('age > 10).select('name).show()
println("-----------------------------SQL-------------------------------")
people.registerTempTable("people")
sqlContext.sql("select name from people where age > 10").collect.foreach(println)
people.saveAsParquet("保存的路径")
}
}
parquet格式文件测试:
val sc = new SparkContext()
val sql = new SQLContext(sc)
import sql.implicits._
val parquet = sql.parquetFile(args(0))
println("------------------------DataFrame------------------------------------")
println(parquet.where(parquet("age") > 10).select(parquet("name")).show())
println("---------------------------DSL---------------------------------")
println(parquet.where('age > 10).select('name).show())
println("-----------------------------SQL-------------------------------")
parquet.registerTempTable("parquet")
sql.sql("select name from parquet where age > 10").map(p => "name:" + p(0)).collect().foreach(println)
Json格式测试:
val sc = new SparkContext()
val sql = new SQLContext(sc)
import sql.implicits._
val json = sql.jsonFile(args(0))
println("------------------------DataFrame------------------------------------")
println(json.where(json("age") > 10).select(json("name")).show())
println("---------------------------DSL---------------------------------")
println(json.where('age > 10).select('name).show())
println("-----------------------------SQL-------------------------------")
json.registerTempTable("json")
sql.sql("select name from json where age > 10").map(p => "name:" + p(0)).collect().foreach(println)
可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作
以上为SparkSQL API的简单使用