Spark Sql 连接mysql
我的原创地址:https://dongkelun.com/2018/03/21/sparkMysql/
1、基本概念和用法(摘自spark官方文档中文版)
Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源。当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD。这是因为结果是以数据框(DataFrame)返回的,且这样 Spark SQL操作轻松或便于连接其它数据源。因为这种 JDBC 数据源不需要用户提供 ClassTag,所以它也更适合使用 Java 或 Python 操作。(注意,这与允许其它应用使用 Spark SQL 执行查询操作的 Spark SQL JDBC 服务器是不同的)。
使用 JDBC 访问特定数据库时,需要在 spark classpath 上添加对应的 JDBC 驱动配置。例如,为了从 Spark Shell 连接 postgres,你需要运行如下命令 :
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
通过调用数据源API,远程数据库的表可以被加载为DataFrame 或Spark SQL临时表。支持的参数有 :
属性名 | 含义 |
---|---|
url | 要连接的 JDBC URL。 |
dbtable | 要读取的 JDBC 表。 注意,一个 SQL 查询的 From 分语句中的任何有效表都能被使用。例如,既可以是完整表名,也可以是括号括起来的子查询语句。 |
driver | 用于连接 URL 的 JDBC 驱动的类名。 |
partitionColumn, lowerBound, upperBound, numPartitions | 这几个选项,若有一个被配置,则必须全部配置。它们描述了当从多个 worker 中并行的读取表时,如何对它分区。partitionColumn 必须时所查询表的一个数值字段。注意,lowerBound 和 upperBound 都只是用于决定分区跨度的,而不是过滤表中的行。因此,表中的所有行将被分区并返回。 |
fetchSize | JDBC fetch size,决定每次读取多少行数据。 默认将它设为较小值(如,Oracle上设为 10)有助于 JDBC 驱动上的性能优化。 |
2、scala代码实现连接mysql
2.1 添加mysql 依赖
在sbt 配置文件里添加:
"mysql" % "mysql-connector-java" % "6.0.6"
然后执行:
sbt eclipse
2.2 建表并初始化数据
DROP TABLE IF EXISTS `USER_T`; CREATE TABLE `USER_T` ( `ID` INT(11) NOT NULL, `USER_NAME` VARCHAR(40) NOT NULL, PRIMARY KEY (`ID`) ) ENGINE=INNODB DEFAULT CHARSET=UTF8;
INSERT INTO `USER_T`(`ID`,`USER_NAME`) VALUES (1,'测试1'); INSERT INTO `USER_T`(`ID`,`USER_NAME`) VALUES (2,'测试2');
2.3 代码
2.3.1 查询
package com.dkl.leanring.spark.sql import org.apache.spark.sql.SparkSession /** * spark查询mysql测试 */ object MysqlQueryDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MysqlQueryDemo").master("local").getOrCreate() val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "USER_T") .option("user", "root") .option("password", "Root-123456") .load() jdbcDF.show() } }
2.3.2 插入数据
新建USER_T.csv,造几条数据如图:
(需将csv的编码格式转为utf-8,否则spark读取中文乱码,转码方法见:https://jingyan.baidu.com/article/fea4511a092e53f7bb912528.html)
package com.dkl.leanring.spark.sql import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode import java.util.Properties /** * 从USER_T.csv读取数据并插入的mysql表中 */ object MysqlInsertDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MysqlInsertDemo").master("local").getOrCreate() val df = spark.read.option("header", "true").csv("src/main/resources/scala/USER_T.csv") df.show() val url = "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8" val prop = new Properties() prop.put("user", "root") prop.put("password", "Root-123456") df.write.mode(SaveMode.Append).jdbc(url, "USER_T", prop) } }
再查询一次,就会发现表里多了几条数据
3、注意
上面的代码在本地eclipse运行是没有问题的,如果放在服务器上用spark-submit提交的话,可能会报异常
java.sql.SQLException:No suitable driver
解决方法是在代码里添加
mysql:
.option("driver", "com.mysql.jdbc.Driver")
oracle:
.option("driver", "oracle.jdbc.driver.OracleDriver")
具体可参考我的另一篇博客:spark-submit报错:Exception in thread “main” java.sql.SQLException:No suitable driver

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
spark基本概念(便于自己随时查阅--摘自Spark快速大数据分析)
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80245998 我的原创地址:https://dongkelun.com/2018/01/23/sparkBasicConcept/ 1、 RDD 在Spark 中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行进行。这样的数据集被称为弹性分布式数据集(resilient distributed dataset),简称RDD。RDD 是Spark 对分布式数据和计算的基本抽象。 RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的RDD 的操作,比如map() 和filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count() 和first()。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数...
- 下一篇
centOS安装hadoop、编写wordCount小程序
#为了方便后续管理,添加hadoop用户,并设置密码 adduser hadoop passwd hadoop #hadoop用户赋权(加sudo可以执行root的操作) #给sudoers文件读写权 chmod u+w /etc/sudoers #赋权 vim /etc/sudoers #在root ALL = (ALL) ALL 下添加 hadoop ALL=(ALL) ALL 并保存 #为了安全撤销写权限 chmod u-w /etc/sudoers 用户赋权 hadoop依赖1.7+的jdk 如果是3.0+版本的hadoop依赖1.8+的idk,安装jdk #安装jdk 这边我通过ftp把jdk1.8 tar包传到服务器 #进入相应目录解压 cd /usr/local/tools tar -zxvf jdk-8u171-linux-x64.tar.gz #配置环境变量 sudo vim /etc/profile #文件中添加 JAVA_HOME=/usr/local/tools/jdk1.8.0_171 PATH=$JAVA_HOME/bin:$PATH CLASSPATH=$...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Mario游戏-低调大师作品
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7安装Docker,走上虚拟化容器引擎之路