spark ML算法之线性回归使用
我的原创地址:https://dongkelun.com/2018/04/09/sparkMlLinearRegressionUsing/
前言
本文是讲如何使用spark ml进行线性回归,不涉及线性回归的原理。
1、数据格式
1.1 普通标签格式
1.1.1 格式为:
标签,特征值1 特征值2 特征值3...
1,1.9 2,3.1 3,4 3.5,4.45 4,5.02 9,9.97 -2,-0.98
1.1.2 spark 读取
1、Rdd
旧版(mllib)的线性回归要求传入的参数类型为RDD[LabeledPoint]
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint val data_path = "files/ml/linear_regression_data1.txt" val data = sc.textFile(data_path) val training = data.map { line => val arr = line.split(',') LabeledPoint(arr(0).toDouble, Vectors.dense(arr(1).split(' ').map(_.toDouble))) }.cache() training.foreach(println)
结果:
(1.0,[1.9]) (2.0,[3.1]) (3.0,[4.0]) (3.5,[4.45]) (4.0,[5.02]) (9.0,[9.97]) (-2.0,[-0.98])
一共有两列,第一列可以通过.label获得(类型为Double),第二列可以通过.features获得(类型为Vector[Double])
2、 DataFrame
新版(ml)的线性回归要求传入的参数类型为Dataset[_]
import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.Row import spark.implicits._ val data_path = "files/ml/linear_regression_data1.txt" val data = spark.read.text(data_path) val training = data.map { case Row(line: String) => var arr = line.split(',') (arr(0).toDouble, Vectors.dense(arr(1).split(' ').map(_.toDouble))) }.toDF("label", "features") training.show()
结果:
+-----+--------+ |label|features| +-----+--------+ | 1.0| [1.9]| | 2.0| [3.1]| | 3.0| [4.0]| | 3.5| [4.45]| | 4.0| [5.02]| | 9.0| [9.97]| | -2.0| [-0.98]| +-----+--------+
其中列名”label”, “features”固定,不能改为其他列名。
1.2 LIBSVM格式
1.2.1 格式为:
label index1:value1 index2:value2 ...
其中每一行的index必须为升序
为了便于理解,造几条多维数据:
1 1:1.9 2:2 4:2 100:3 101:6 2 1:3.1 2:2 4:2 100:3 101:6 3 1:4 2:2 4:2 100:3 101:6 3.5 1:4.45 2:2 4:2 100:3 101:6 4 1:5.02 2:2 4:2 100:3 101:6 9 1:9.97 4:2 100:3 101:6 -2 1:-0.98 2:2 4:2 100:3 201:6
1.2.2 spark 读取
1、Rdd
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.mllib.util.MLUtils val data_path = "files/ml/linear_regression_data2.txt" val training = MLUtils.loadLibSVMFile(sc, data_path) training.foreach(println)
结果:
(1.0,(201,[0,1,3,99,100],[1.9,2.0,2.0,3.0,6.0])) (2.0,(201,[0,1,3,99,100],[3.1,2.0,2.0,3.0,6.0])) (3.0,(201,[0,1,3,99,100],[4.0,2.0,2.0,3.0,6.0])) (3.5,(201,[0,1,3,99,100],[4.45,2.0,2.0,3.0,6.0])) (4.0,(201,[0,1,3,99,100],[5.02,2.0,2.0,3.0,6.0])) (9.0,(201,[0,3,99,100],[9.97,2.0,3.0,6.0])) (-2.0,(201,[0,1,3,99,200],[-0.98,2.0,2.0,3.0,6.0]))
返回类型为RDD[LabeledPoint],其中第一列为label,第二列vector的第一个值为max(index),第二个index-1组成的数组,第三个为value组成的数组。
2、DataFrame
val data_path = "files/ml/linear_regression_data2.txt" val data = spark.read.text(data_path) val training = spark.read.format("libsvm").load(data_path) training.show(false)
结果:
+-----+--------------------------------------------+ |label|features | +-----+--------------------------------------------+ |1.0 |(201,[0,1,3,99,100],[1.9,2.0,2.0,3.0,6.0]) | |2.0 |(201,[0,1,3,99,100],[3.1,2.0,2.0,3.0,6.0]) | |3.0 |(201,[0,1,3,99,100],[4.0,2.0,2.0,3.0,6.0]) | |3.5 |(201,[0,1,3,99,100],[4.45,2.0,2.0,3.0,6.0]) | |4.0 |(201,[0,1,3,99,100],[5.02,2.0,2.0,3.0,6.0]) | |9.0 |(201,[0,3,99,100],[9.97,2.0,3.0,6.0]) | |-2.0 |(201,[0,1,3,99,200],[-0.98,2.0,2.0,3.0,6.0])| +-----+--------------------------------------------+
2、线性回归代码
2.1 数据
用libsvm格式的数据:
1 1:1.9 2 1:3.1 3 1:4 3.5 1:4.45 4 1:5.02 9 1:9.97 -2 1:-0.98
2.2 旧版代码
package com.dkl.leanring.spark.ml import org.apache.log4j.{ Level, Logger } import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LinearRegressionModel object OldLinearRegression { def main(args: Array[String]) { // 构建Spark对象 val conf = new SparkConf().setAppName("OldLinearRegression").setMaster("local") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) //读取样本数据 val data_path = "files/ml/linear_regression_data3.txt" val training = MLUtils.loadLibSVMFile(sc, data_path) val numTraing = training.count() // 新建线性回归模型,并设置训练参数 val numIterations = 10000 val stepSize = 0.5 val miniBatchFraction = 1.0 //书上的代码 intercept 永远为0 //val model = LinearRegressionWithSGD.train(examples, numIterations, stepSize, miniBatchFraction) var lr = new LinearRegressionWithSGD().setIntercept(true) lr.optimizer.setNumIterations(numIterations).setStepSize(stepSize).setMiniBatchFraction(miniBatchFraction) val model = lr.run(training) println(model.weights) println(model.intercept) // 对样本进行测试 val prediction = model.predict(training.map(_.features)) val predictionAndLabel = prediction.zip(training.map(_.label)) val print_predict = predictionAndLabel.take(20) println("prediction" + "\t" + "label") for (i <- 0 to print_predict.length - 1) { println(print_predict(i)._1 + "\t" + print_predict(i)._2) } // 计算测试误差 val loss = predictionAndLabel.map { case (p, l) => val err = p - l err * err }.reduce(_ + _) val rmse = math.sqrt(loss / numTraing) println(s"Test RMSE = $rmse.") } }
其中注释的第30行代码为书上的写法,但这样写intercept一直为0,也就是只适用于y=a*x的形式,不适用于y=ax+b,改为31、32替代即可。
结果:
[0.992894785953067] -0.9446037936869749 prediction label 0.9418962996238525 1.0 2.133370042767533 2.0 3.0269753501252934 3.0 3.473778003804174 3.5 4.039728031797421 4.0 8.954557222265104 9.0 -1.9176406839209805 -2.0 Test RMSE = 0.06866615969192089.
即a=0.992894785953067,b=-0.9446037936869749,y=0.992894785953067*x-0.9446037936869749
2.2 新版代码
package com.dkl.leanring.spark.ml import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.sql.SparkSession object NewLinearRegression { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("NewLinearRegression") .master("local") .getOrCreate() val data_path = "files/ml/linear_regression_data3.txt" import spark.implicits._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.Row val training = spark.read.format("libsvm").load(data_path) val lr = new LinearRegression() .setMaxIter(10000) .setRegParam(0.3) .setElasticNetParam(0.8) val lrModel = lr.fit(training) println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}") val trainingSummary = lrModel.summary println(s"numIterations: ${trainingSummary.totalIterations}") println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]") trainingSummary.residuals.show() println(s"RMSE: ${trainingSummary.rootMeanSquaredError}") println(s"r2: ${trainingSummary.r2}") trainingSummary.predictions.show() spark.stop() } }
结果:
Coefficients: [0.9072296333951224] Intercept: -0.630360819004294 numIterations: 3 objectiveHistory: [0.5,0.41543560544030766,0.08269406021049913] +--------------------+ | residuals| +--------------------+ | -0.0933754844464385| |-0.18205104452058585| |0.001442285423804...| | 0.09318895039599973| | 0.07606805936077965| | 0.5852813740549223| | -0.4805541402684861| +--------------------+ RMSE: 0.2999573166705823 r2: 0.9906296595124621 +-----+---------------+------------------+ |label| features| prediction| +-----+---------------+------------------+ | 1.0| (1,[0],[1.9])|1.0933754844464385| | 2.0| (1,[0],[3.1])| 2.182051044520586| | 3.0| (1,[0],[4.0])|2.9985577145761955| | 3.5| (1,[0],[4.45])|3.4068110496040003| | 4.0| (1,[0],[5.02])|3.9239319406392204| | 9.0| (1,[0],[9.97])| 8.414718625945078| | -2.0|(1,[0],[-0.98])|-1.519445859731514| +-----+---------------+------------------+
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
spark 统计每天新增用户数
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80256688 我的原创地址:https://dongkelun.com/2018/04/11/sparkNewUV/ 前言 本文源自一位群友的一道美团面试题,解题思路(基于倒排索引)和代码都是这位大佬(相对于尚处于小白阶段的我)写的,我只是在基于倒排索引的基础上帮忙想出了最后一步思路,感觉这个解题思路不错,值得记录一下。 1、原始数据 2017-01-01 a 2017-01-01 b 2017-01-01 c 2017-01-02 a 2017-01-02 b 2017-01-02 d 2017-01-03 b 2017-01-03 e 2017-01-03 f 根据数据可以看出我们要求的结果为: 2017-01-01 新增三个用户(a,b,c) 2017-01-02 新增一个用户(d) 2017-01-03 新增两个用户(e,f) 2、解题思路 2.1 对原始数据...
- 下一篇
Hadoop Yarn上的调度器
1. 引言 Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色。在讨论其构造器之前先简单了解一下Yarn的架构。 上图是Yarn的基本架构,其中 ResourceManager 是整个架构的核心组件,负责集群上的资源管理,包括内存、CPU以及集群上的其他资; ApplicationMaster 负责在生命周期内的应用程序调度; NodeManager 负责本节点上资源的供给和隔离;Container 可以抽象的看成是运行任务的一个容器。本文讨论的调度器是在 ResourceManager 进行调度,接下来在了解一下 FIFO 调度器、Capacity 调度器、Fair 调度器三个调度器。 2. FIFO调度器 上图显示了 FIFO 调度器的实现(执行过程示意图)。FIFO 调度器是先进先出(First In First Out)调度器。FIFO 调度器是 Hadoop 使用最早的一种调度策略,可以简单的将其理解为一个 Java 队列,这就意味着在集群中同时只能有一个作业运行。所有的应用程序按照提交顺序来执行,在上一个 Job 执行完成之后,下一个 Job 按照队列中的顺序...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Hadoop3单机部署,实现最简伪集群