您现在的位置是:首页 > 文章详情

Spark RDDRelation

日期:2015-12-12点击:403

package
main.asiainfo.coc.sparksql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} case class Record(key: Int, value: String) object RDDRelation { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("RDDRelation").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc)
//引入SQL context 提供所有的SQL 与 隐式转换方法
import sqlContext.implicits._ //生成1到100的数字,并做成key value形式的DateFrame
val df
= sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF() //df: "[key:int, value : String]" // 将DF做成以case class的临时表 L df.registerTempTable("records") // 随后便可以调用sqlContext查询这个临时表 println("Result of SELECT *:") sqlContext.sql("SELECT * FROM records").collect().foreach(println) // 聚合查询 val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0) //count: 100 println(s"COUNT(*): $count") // 查询的结果是一个普通的RDD,所以可以根据条件筛选你想要的数据哪一列数据 val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10") //rddFromSql:"[key : int, value String]" println("Result of RDD.map:") rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println) // 将文件存成parquet格式 df.write.parquet("pair.parquet") // 读取parquet格式文件 val parquetFile = sqlContext.read.parquet("pair.parquet") parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println) // parquetFile也可以做成临时表 parquetFile.registerTempTable("parquetFile") sqlContext.sql("SELECT * FROM parquetFile").collect().foreach(println) sc.stop() } }

注意 这里声明的是 sqlContext = new SQLContext(sc)  如果要存成hive 表 需用hivecontext.

 

 

原文链接:https://yq.aliyun.com/articles/609097
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章