SparkML机器学习之特征工程(二)特征转化(Binarizer、StandardScaler、MaxAbsScaler、Normaliz...
特征转化
为什么要转化数据呢,就是要让它成为有效的特征,因为原始数据是很多脏数据无用数据的。常用的方法是标准化,归一化,特征的离散化等等。比如我输入的数据是句子,我得把它切分为一个个单词进行分析,这就是一种转化。
连续型数据处理之二值化:Binarizer
假设淘宝现在有个需求,我得根据年龄来进行物品推荐,把50以上的人分为老年,50以下分为非老年人,那么我们根据二值化可以很简单的把50以上的定为1,50以下的定为0。这样就方便我们后续的推荐了。Binarizer就是根据阈值进行二值化,大于阈值的为1.0,小于等于阈值的为0.0
package ml.test import org.apache.spark.ml.feature.Binarizer import org.apache.spark.sql.SparkSession /** * Created by liuyanling on 2018/3/19 */ object BinarizerDemo { def main(args: Array[String]): Unit = { var spark = SparkSession.builder().appName("BinarizerDemo").master("local[2]").getOrCreate(); val array = Array((1,34.0),(2,56.0),(3,58.0),(4,23.0)) //将数组转为DataFrame val df = spark.createDataFrame(array).toDF("id","age") //初始化Binarizer对象并进行设定:setThreshold是设置我们的阈值,InputCol是设置需要进行二值化的输入列,setOutputCol设置输出列 val binarizer = new Binarizer().setThreshold(50.0).setInputCol("age").setOutputCol("binarized_feature") //transform方法将DataFrame二值化。 val binarizerdf = binarizer.transform(df) //show是用于展示结果 binarizerdf.show } }
输出结果:
+---+----+-----------------+ | id| age|binarized_feature| +---+----+-----------------+ | 1|34.0| 0.0| | 2|56.0| 1.0| | 3|58.0| 1.0| | 4|23.0| 0.0| +---+----+-----------------+
连续型数据处理之给定边界离散化:Bucketizer
现在淘宝的需求变了,他们觉得把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。离散化就是把特征进行适当的离散处理,比如上面所说的年龄是个连续的特征,但是我把它分为不同的年龄阶段就是把它离散化了,这样更利于我们分析用户行为进行精准推荐。Bucketizer能方便的将一堆数据分成不同的区间。
object BucketizerDemo { def main(args: Array[String]): Unit = { var spark = SparkSession.builder().appName("BucketizerDemo").master("local[2]").getOrCreate(); val array = Array((1,13.0),(2,16.0),(3,23.0),(4,35.0),(5,56.0),(6,44.0)) //将数组转为DataFrame val df = spark.createDataFrame(array).toDF("id","age") // 设定边界,分为5个年龄组:[0,20),[20,30),[30,40),[40,50),[50,正无穷) // 注:人的年龄当然不可能正无穷,我只是为了给大家演示正无穷PositiveInfinity的用法,负无穷是NegativeInfinity。 val splits = Array(0, 20, 30, 40, 50, Double.PositiveInfinity) //初始化Bucketizer对象并进行设定:setSplits是设置我们的划分依据 val bucketizer = new Bucketizer().setSplits(splits).setInputCol("age").setOutputCol("bucketizer_feature") //transform方法将DataFrame二值化。 val bucketizerdf = bucketizer.transform(df) //show是用于展示结果 bucketizerdf.show } }
输出结果:
+---+----+------------------+ | id| age|bucketizer_feature| +---+----+------------------+ | 1|13.0| 0.0| | 2|16.0| 0.0| | 3|23.0| 1.0| | 4|35.0| 2.0| | 5|56.0| 4.0| | 6|44.0| 3.0| +---+----+------------------+
连续型数据处理之给定分位数离散化:QuantileDiscretizer
有时候我们不想给定分类标准,可以让spark自动给我们分箱。
object QuantileDiscretizerDemo { def main(args: Array[String]): Unit = { var spark = SparkSession.builder().appName("QuantileDiscretizerDemo").master("local[2]").getOrCreate(); val array = Array((1,13.0),(2,14.0),(3,22.0),(4,35.0),(5,44.0),(6,56.0),(7,21.0)) val df = spark.createDataFrame(array).toDF("id","age") //和Bucketizer类似:将连续数值特征转换离散化。但这里不再自己定义splits(分类标准),而是定义分几箱就可以了。 val quantile = new QuantileDiscretizer().setNumBuckets(5).setInputCol("age").setOutputCol("quantile_feature") //因为事先不知道分桶依据,所以要先fit,相当于对数据进行扫描一遍,取出分位数来,再transform进行转化。 val quantiledf = quantile.fit(df).transform(df) //show是用于展示结果 quantiledf.show } }
标准化
对于同一个特征,不同的样本中的取值可能会相差非常大,一些异常小或异常大的数据会误导模型的正确训练;另外,如果数据的分布很分散也会影响训练结果。以上两种方式都体现在方差会非常大。此时,我们可以将特征中的值进行标准差标准化,即转换为均值为0,方差为1的正态分布。如果特征非常稀疏,并且有大量的0(现实应用中很多特征都具有这个特点),Z-score 标准化的过程几乎就是一个除0的过程,结果不可预料。所以在训练模型之前,一定要对特征的数据分布进行探索,并考虑是否有必要将数据进行标准化。基于特征值的均值(mean)和标准差(standard deviation)进行数据的标准化。它的计算公式为:标准化数据=(原数据-均值)/标准差。标准化后的变量值围绕0上下波动,大于0说明高于平均水平,小于0说明低于平均水平。
StandardScaler
object StandardScalerDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").getOrCreate() val df = spark.read.format("libsvm").load("libsvm_data.txt") //setWithMean是否减均值。setWithStd是否将数据除以标准差。这里就是没有减均值但有除以标准差 df.printSchema() val scaler =new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithMean(false).setWithStd(true) // 计算均值方差等参数 val scalermodel = scaler.fit(df) // 标准化 scalermodel.transform(df).show() } }
libsvm_data.txt label 标签,就是属于哪一类,即你要分类的种类,通常是一些整数。 index 是有顺序的索引,通常是连续的整数。就是指特征编号,必须按照升序排列 value 就是特征值,训练,通常是一堆实数组成。 格式:标签 第一维特征编号:第一维特征值 第二维特征编号:第二维特征值 …例: 0 1:1 2:2 1 1:2 2:4
归一化
为什么数据需要归一化?以房价预测为案例,房价(y)通常与离市中心距离(x1)、面积(x2)、楼层(x3)有关,设y=ax1+bx2+cx3,那么abc就是我们需要重点解决的参数。但是有个问题,面积一般数值是比较大的,100平甚至更多,而距离一般都是几公里而已,b参数只要一点变化都能对房价产生巨大影响,而a的变化对房价的影响相对就小很多了。显然这会影响最终的准确性,毕竟距离可是个非常大的影响因素啊。 所以, 需要使用特征的归一化, 取值跨度大的特征数据, 我们浓缩一下, 跨度小的括展一下, 使得他们的跨度尽量统一。
归一化就是将所有特征值都等比地缩小到0-1或者-1到1之间的区间内。其目的是为了使特征都在相同的规模中。
绝对值最大标准化:MaxAbsScaler
import org.apache.spark.ml.feature.MaxAbsScaler import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession /** * Created by LYL on 2018/3/20. */ object MaxAbsScalerDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MaxAbsScalerDemo").master("local[2]").getOrCreate() val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(10.0, 0.1, -8.0)), (1, Vectors.dense(100.0, 1.0, -4.0)), (2, Vectors.dense(1000.0, 10.0, 8.0)) )).toDF("id", "features") val maxabs = new MaxAbsScaler().setInputCol("features").setOutputCol("maxabs_features") // fit作用是把所有值都扫描一遍,计算出最大最小值,比如1000的话那么absMax=1000。最后返回MaxAbsScalerModel val scalerModel = maxabs.fit(dataFrame) // 使用每个特征的最大值的绝对值将输入向量的特征值都缩放到[-1,1]范围内 val scalerdf = scalerModel.transform(dataFrame) scalerdf.show } }
运行结果:
+---+-----------------+----------------+ | id| features| maxabs_features| +---+-----------------+----------------+ | 0| [10.0,0.1,-8.0]|[0.01,0.01,-1.0]| | 1| [100.0,1.0,-4.0]| [0.1,0.1,-0.5]| | 2|[1000.0,10.0,8.0]| [1.0,1.0,1.0]| +---+-----------------+----------------+
归一化之最小最大值标准化MinMaxScaler
import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession /** * Created by LYL on 2018/3/20. */ object MinMaxScalerDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MinMaxScalerDemo").master("local[2]").getOrCreate() val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(2.0, 0.9, -2.0)), (1, Vectors.dense(3.0, -2.0, -3.0)), (2, Vectors.dense(4.0, -2.0, 2.0)) )).toDF("id", "features") val maxabs = new MinMaxScaler().setInputCol("features").setOutputCol("minmax_features") val scalerModel = maxabs.fit(dataFrame) // 将所有值都缩放到[0,1]范围内 val scalerdf = scalerModel.transform(dataFrame) scalerdf.show } }
运行结果为:
+---+---------------+---------------+ | id| features|minmax_features| +---+---------------+---------------+ | 0| [2.0,0.9,-2.0]| [0.0,1.0,0.2]| | 1|[3.0,-2.0,-3.0]| [0.5,0.0,0.0]| | 2| [4.0,-2.0,2.0]| [1.0,0.0,1.0]| +---+---------------+---------------+
正则化Normalizer
为什么要有正则化?就是为了防止过拟合。来看一下正则化是怎么计算的:
def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").getOrCreate() val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") //setP是指L1正则化还是L2正则化,比如1.0就是上面说到的L1正则化,计算如下:1/(1+0.5+1) val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalizer_features").setP(1.0) normalizer.transform(dataFrame).show(truncate = false) }
输出结果为:
+---+--------------+-------------------+ |id |features |normalizer_features| +---+--------------+-------------------+ |0 |[1.0,0.5,-1.0]|[0.4,0.2,-0.4] | |1 |[2.0,1.0,1.0] |[0.5,0.25,0.25] | |2 |[4.0,10.0,2.0]|[0.25,0.625,0.125] | +---+--------------+-------------------+
N-gram
N-Gram认为语言中每个单词只与其前面长度 N-1 的上下文有关。主要分为bigram和trigram,bigram假设下一个词的出现依赖它前面的一个词,trigram假设下一个词的出现依赖它前面的两个词。在SparkML中用NGram类实现,setN(2)为bigram,setN(3)为trigram。
def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").getOrCreate() val wordDataFrame = spark.createDataFrame(Seq( (0, Array("Hi", "I", "heard", "about", "Spark")), (1, Array("I", "wish", "Java", "could", "use", "case", "classes")), (2, Array("Logistic", "regression", "models", "are", "neat")) )).toDF("id", "words") val ngram2 = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams") val ngram3 = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams") ngram2.transform(wordDataFrame).show(false) ngram3.transform(wordDataFrame).show(false) }
输出结果为:
+---+------------------------------------------+------------------------------------------------------------------+ |id |words |ngrams | +---+------------------------------------------+------------------------------------------------------------------+ |0 |[Hi, I, heard, about, Spark] |[Hi I, I heard, heard about, about Spark] | |1 |[I, wish, Java, could, use, case, classes]|[I wish, wish Java, Java could, could use, use case, case classes]| |2 |[Logistic, regression, models, are, neat] |[Logistic regression, regression models, models are, are neat] | +---+------------------------------------------+------------------------------------------------------------------+ 18/03/24 11:23:32 INFO CodeGenerator: Code generated in 14.198881 ms +---+------------------------------------------+--------------------------------------------------------------------------------+ |id |words |ngrams | +---+------------------------------------------+--------------------------------------------------------------------------------+ |0 |[Hi, I, heard, about, Spark] |[Hi I heard, I heard about, heard about Spark] | |1 |[I, wish, Java, could, use, case, classes]|[I wish Java, wish Java could, Java could use, could use case, use case classes]| |2 |[Logistic, regression, models, are, neat] |[Logistic regression models, regression models are, models are neat] | +---+------------------------------------------+--------------------------------------------------------------------------------+
多项式转化PolynomialExpansion
有时候要对特征值进行一些多项式的转化,比如平方啊,三次方啊等等,那就用到了PolynomialExpansion。
object PolynomialExpansionDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").getOrCreate() val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 5.0)), (1, Vectors.dense(2.0, 1.0)), (2, Vectors.dense(4.0, 8.0)) )).toDF("id", "features") //setDegree表示多项式最高次幂 比如1.0,5.0可以是 三次:1.0^3 5.0^3 1.0+5.0^2 二次:1.0^2+5.0 1.0^2 5.0^2 1.0+5.0 一次:1.0 5.0 val po = new PolynomialExpansion().setDegree(3).setInputCol("features").setOutputCol("Polynomial_features") po.transform(dataFrame).show(truncate = false) } }
输出结果为:
+---+---------+-----------------------------------------------+ |id |features |Polynomial_features | +---+---------+-----------------------------------------------+ |0 |[1.0,5.0]|[1.0,1.0,1.0,5.0,5.0,5.0,25.0,25.0,125.0] | |1 |[2.0,1.0]|[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0] | |2 |[4.0,8.0]|[4.0,16.0,64.0,8.0,32.0,128.0,64.0,256.0,512.0]| +---+---------+-----------------------------------------------+
Tokenizer分词器
当我们的输入数据为文本(句子)的时候,我们会想把他们切分为单词再进行数据处理,这时候就要用到Tokenizer类了。
object TokenizerDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").appName("TokenizerDemo").getOrCreate() val dataFrame = spark.createDataFrame(Seq((0, "Hello I am LYL"), (1, "I Love Bigdata"), (2, "1 2 3 4"))).toDF("id", "sentence") // Tokenization(分词器)是一个接受文本(通常是句子)输入,然后切分成单词。 val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") tokenizer.transform(dataFrame).show() // RegexTokenizer(正则化分词器)基于正则表达式匹配提供了更多高级的分词功能。将gaps参数设置为false,表明使用正则表达式匹配标记,而不是使用分隔符。 // 此处\\d表示匹配数字。 val regextokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setGaps(false).setPattern("\\d") regextokenizer.transform(dataFrame).show() //使用udf函数 统计单词个数 val wordcount = udf { (word: Seq[String]) => word.length } tokenizer.transform(dataFrame).select("words").withColumn("wordcount", wordcount(col("words"))).show() }
输出结果为:
+---+--------------+-------------------+ | id| sentence| words| +---+--------------+-------------------+ | 0|Hello I am LYL|[hello, i, am, lyl]| | 1|I Love Bigdata| [i, love, bigdata]| | 2| 1 2 3 4| [1, 2, 3, 4]| +---+--------------+-------------------+ +---+--------------+------------+ | id| sentence| words| +---+--------------+------------+ | 0|Hello I am LYL| []| | 1|I Love Bigdata| []| | 2| 1 2 3 4|[1, 2, 3, 4]| +---+--------------+------------+ +-------------------+---------+ | words|wordcount| +-------------------+---------+ |[hello, i, am, lyl]| 4| | [i, love, bigdata]| 3| | [1, 2, 3, 4]| 4| +-------------------+---------+
SQLTransformer
我们都很喜欢sql语句,简单好用又熟悉,那么Spark ML很人性化的为我们提供了SQLTransformer类,使得我们能用我们熟悉的SQL来做特征转化。它支持SparkSql中的所有select选择语句,sum(),count(),group by,order by等等都可以用!形如"SELECT ...FROM __THIS__"。'__THIS__'代表输入数据的基础表。
def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").getOrCreate() val df = spark.createDataFrame(Seq((1, "lyl", 90), (2, "ddd", 88), (3, "ccc", 95))) .toDF("id", "name", "score") //__THIS__代表输入数据的基础表 在这里就是指df val sqltransformer1 = new SQLTransformer().setStatement("select id from __THIS__") val sqltransformer2 = new SQLTransformer().setStatement("select sum(score)/3 as average from __THIS__") sqltransformer1.transform(df).show() sqltransformer2.transform(df).show() }
输出结果为:
+---+ | id| +---+ | 1| | 2| | 3| +---+ |average| +-------+ | 91.0| +-------+

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
SparkML机器学习之特征工程(一)特征提取(TF-IDF、Word2Vec、CountVectorizer)
特征工程 我们都知道特征工程在机器学习中是很重要的,然而特征工程到底是什么?怎么样通俗的理解它呢?打个比方,即使你有再好的渔具,如果给你一片没有鱼的池塘,那也是白费力气的。而特征工程就是找有鱼的那片水域。所以我们可以这么理解,特征是数据中抽取出来的对结果预测有用的信息(水域),而特征工程就是使用专业知识来处理数据,筛选出具有价值的特征(从100个水域中挑选出鱼最多最好的水域)。所以有句话是这么说的:算法再牛逼,其上限也是由特征工程决定的,就像你渔具再好,捕鱼多少也是由水域这个特征决定的。在SparkML中、对于特征工程的操作主要分为特征提取,特征转化、特征选择。 特征提取 从原始数据中提取特征 TF-IDF (Term frequency-inverse document frequency) TF-IDF称为词频-逆文件频率,先搞清楚它有什么作用吧!很经典的一个问题,如何得到一篇文章的关键词??大家都能想到,看看这篇文章什么词出现最多!思路是没问题,但是,一篇文章,出现最多的,应该都是诸如“的”之类的停用词吧?这就没意义了啊!那就把这些停用词过滤掉呗,这样还是会出问题。比如一篇文章,...
- 下一篇
Hadoop集群nodes unhealthy解决方法
欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 在搭建好Hadoop集群之后,所有服务均可正常启动,但是在运行MapReduce程序的时候,发现任务卡在7/09/07 22:28:14 INFO mapreduce.Job: Running job: job_1504781778966_0003,不再往下执行了,经过检查,发现所有的nodes节点都处于unhealthy的状态,使用命令查看node 的状态 bin/yarn node -list -all 查看日志发现 2015-07-16 15:28:58,643 WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /opt/beh/data/yarn/nmlocal error, used space above threshold of 90.0%, removing from list of valid directorie...
相关文章
文章评论
共有0条评论来说两句吧...