Spark工程开发常用函数与方法(Scala语言)
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, DataFrame}
import scala.collection.mutable.ArrayBuffer
import main.asiainfo.coc.tools.Configure
import org.apache.spark.sql.hive.HiveContext
import java.sql.DriverManager
import java.sql.Connection
1 连接前台数据源 查询前台MYSQL中的数据
val DIM_COC_INDEX_INFO_DDL = s""" CREATE TEMPORARY TABLE DIM_COC_INDEX_INFO USING org.apache.spark.sql.jdbc OPTIONS ( url '${mySQLUrl}', dbtable 'DIM_COC_INDEX_INFO' )""".stripMargin sqlContext.sql(DIM_COC_INDEX_INFO_DDL) val DIM_COC_INDEX_INFO = sql("SELECT * FROM DIM_COC_INDEX_INFO").cache()
2 在A表中筛选出 B表中获取的TARGET_TABLE_CODE 然后再按照DATA_SRC_CODE排序,查询出源表的集合
val sources = DIM_COC_INDEX_INFO.filter("TARGET_TABLE_CODE ='"+TARGET_TABLE_CODE+"'") .select("DATA_SRC_CODE").groupBy("DATA_SRC_CODE").agg(DIM_COC_INDEX_INFO("DATA_SRC_CODE")).collect
3 将表进行关联
resultIndexTableDF = resultIndexTableDF.join(SOURCE_TABLE,ALL_USERS.col(ALL_USER_JOIN_COLUMN_NAME) === SOURCE_TABLE.col(SOURCE_TABLE_JOIN_COLUMN_NAME),"left_outer") resultIndexTableDF.dtypes.foreach(println)
4 根据条件筛选
val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner") .join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner") .join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner") .join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner") .filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === CI_APPROVE_STATUS_SUCCESS_CODE and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2) and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull //TODO trim.length>0 ) and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === TABLE_DATA_CYCLE ).cache()
5 根据某字段对表进行排序
val labelTargetTables = labels.groupBy("CI_MDA_SYS_TABLE.TABLE_ID","CI_MDA_SYS_TABLE.TABLE_NAME").agg(labels("CI_MDA_SYS_TABLE.TABLE_ID"),labels("CI_MDA_SYS_TABLE.TABLE_NAME")).collect
6 创建parquet格式的表 可使用schema.生成到指定的schema.
sqlContext.sql("create table "+labelTargetTableName+" stored as parquet as select * from default."+labelTargetTableNameJson)
7 保存数据格式,可以指定生成的格式
resultLabelTable.saveAsTable(tableName = labelTargetTableName, source="parquet", mode=SaveMode.Overwrite)
8 根据筛选查询出相应数据,由于cache方法并不属于action操作,接下来的操作需要这一步所执行的数据信息,所以这里使用collect方法,再执行遍历方法
val r0000Labels = labelInThisTargetTable.filter("COUNT_RULES_CODE = 'R_00000'").select("CI_LABEL_INFO.LABEL_ID","COLUMN_NAME").collect for(r0000Label <- r0000Labels){ ........ }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
hadoop-spark-hive-hbase配置相关说明
1. zookeeper 配置 cp app/ochadoop-och3.0.0-SNAPSHOT/zookeeper-3.4.5-cdh5.0.0-beta-2-och3.0.0-SNAPSHOT/conf/zoo_sample.cfg app/ochadoop-och3.0.0-SNAPSHOT/zookeeper-3.4.5-cdh5.0.0-beta-2-och3.0.0-SNAPSHOT/conf/zoo.cfg vim app/ochadoop-och3.0.0-SNAPSHOT/zookeeper-3.4.5-cdh5.0.0-beta-2-och3.0.0-SNAPSHOT/conf/zoo.cfg dataDir=/home/cdh5/tmp/zookeeper clientPort=2183 server.1=ocdata09:2888:3888 mkdir -p /home/cdh5/tmp/zookeeper vim /home/cdh5/tmp/zookeeper/myid echo "1" > /home/cdh5/tmp/zookeeper/myi...
- 下一篇
Mahout学习之运行canopy算法错误及解决办法
一:将Text转换成Vector序列文件时 在Hadoop中运行编译打包好的jar程序,可能会报下面的错误: Exceptioninthread"main"java.lang.NoClassDefFoundError: org/apache/mahout/common/AbstractJob 书中和网上给的解决办法都是:把Mahout根目录下的相应的jar包复制到Hadoop根目录下的lib文件夹下,同时重启Hadoop 但是到了小编这里不管怎么尝试,都不能解决,最终放弃了打包成jar运行的念头,就在对源码进行了修改,在eclipse运行了 二:java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text 此种错误,是由于map和reduce端函数格式输入输出不一致,导致数据类型不匹配 在次要注意一个特别容易出错的地方:Mapper和Reducer类中的函数必须是map和reduce,名字不能...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS关闭SELinux安全模块
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Hadoop3单机部署,实现最简伪集群
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7安装Docker,走上虚拟化容器引擎之路