[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理1
本章介绍数据处理。数据处理是执行Machine Learning所需的关键步骤,因为我们需要清理,过滤,合并和转换我们的所需数据形式。
快速入门
- 读取
>>> from pyspark.sql import SparkSession
>>> spark=SparkSession.builder.appName('data_processing').getOrCreate()
>>> df=spark.read.csv('sample_data.csv',inferSchema=True, header=True)
>>> df.columns
['ratings', 'age', 'experience', 'family', 'mobile']
>>> len(df.columns)
5
>>> df.count()
33
>>> df.printSchema()
root
|-- ratings: integer (nullable = true)
|-- age: integer (nullable = true)
|-- experience: double (nullable = true)
|-- family: integer (nullable = true)
|-- mobile: string (nullable = true)
>>> df.show(3)
+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
| 3| 32| 9.0| 3| Vivo|
| 3| 27| 13.0| 3| Apple|
| 4| 22| 2.5| 0|Samsung|
+-------+---+----------+------+-------+
only showing top 3 rows
>>> df.select('age','mobile').show(5)
+---+-------+
|age| mobile|
+---+-------+
| 32| Vivo|
| 27| Apple|
| 22|Samsung|
| 37| Apple|
| 27| MI|
+---+-------+
only showing top 5 rows
>>> df.describe().show()
+-------+------------------+------------------+------------------+------------------+------+
|summary| ratings| age| experience| family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
| count| 33| 33| 33| 33| 33|
| mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181| null|
| stddev|1.1188806636071336| 6.18527087180309| 6.770731351213326|1.8448330794164254| null|
| min| 1| 22| 2.5| 0| Apple|
| max| 5| 42| 23.0| 5| Vivo|
+-------+------------------+------------------+------------------+------------------+------+
- 添加列
>>> df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3 |32 |9.0 |3 |Vivo |42 |
|3 |27 |13.0 |3 |Apple |37 |
|4 |22 |2.5 |0 |Samsung|32 |
|4 |37 |16.5 |4 |Apple |47 |
|5 |27 |9.0 |1 |MI |37 |
|4 |27 |9.0 |0 |Oppo |37 |
|5 |37 |23.0 |5 |Vivo |47 |
|5 |37 |23.0 |5 |Samsung|47 |
|3 |22 |2.5 |0 |Apple |32 |
|3 |27 |6.0 |0 |MI |37 |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows
>>> from pyspark.sql.types import StringType,DoubleType
>>> df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)
+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|mobile |age_double|
+-------+---+----------+------+-------+----------+
|3 |32 |9.0 |3 |Vivo |32.0 |
|3 |27 |13.0 |3 |Apple |27.0 |
|4 |22 |2.5 |0 |Samsung|22.0 |
|4 |37 |16.5 |4 |Apple |37.0 |
|5 |27 |9.0 |1 |MI |27.0 |
|4 |27 |9.0 |0 |Oppo |27.0 |
|5 |37 |23.0 |5 |Vivo |37.0 |
|5 |37 |23.0 |5 |Samsung|37.0 |
|3 |22 |2.5 |0 |Apple |22.0 |
|3 |27 |6.0 |0 |MI |27.0 |
+-------+---+----------+------+-------+----------+
only showing top 10 rows
上面的False表示超过20个字符也不会截断。
- 数据过滤
>>> df.filter(df['mobile']=='Vivo').show()
+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
| 3| 32| 9.0| 3| Vivo|
| 5| 37| 23.0| 5| Vivo|
| 4| 37| 6.0| 0| Vivo|
| 5| 37| 13.0| 1| Vivo|
| 4| 37| 6.0| 0| Vivo|
+-------+---+----------+------+------+
>>> df.filter(df['mobile']=='Vivo').select('age','ratings', 'mobile').show()
+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32| 3| Vivo|
| 37| 5| Vivo|
| 37| 4| Vivo|
| 37| 5| Vivo|
| 37| 4| Vivo|
+---+-------+------+
>>> df.filter(df['mobile']=='Vivo').filter(df['experience']>10).show()
+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 13.0| 1| Vivo|
+-------+---+----------+------+------+
>>> df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 13.0| 1| Vivo|
+-------+---+----------+------+------+
- 唯一值
>>> df.select('mobile').distinct().show()
+-------+
| mobile|
+-------+
| MI|
| Oppo|
|Samsung|
| Vivo|
| Apple|
+-------+
>>> df.select('mobile').distinct().count()
5
- 分组和排序
>>> df.groupBy('mobile').count().show(5,False)
+-------+-----+
|mobile |count|
+-------+-----+
|MI |8 |
|Oppo |7 |
|Samsung|6 |
|Vivo |5 |
|Apple |7 |
+-------+-----+
>>> df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
+-------+-----+
|mobile |count|
+-------+-----+
|MI |8 |
|Oppo |7 |
|Apple |7 |
|Samsung|6 |
|Vivo |5 |
+-------+-----+
>>> df.groupBy('mobile').mean().show(5,False)
+-------+------------------+------------------+------------------+------------------+
|mobile |avg(ratings) |avg(age) |avg(experience) |avg(family) |
+-------+------------------+------------------+------------------+------------------+
|MI |3.5 |30.125 |10.1875 |1.375 |
|Oppo |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|
|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333|
|Vivo |4.2 |36.0 |11.4 |1.8 |
|Apple |3.4285714285714284|30.571428571428573|11.0 |2.7142857142857144|
+-------+------------------+------------------+------------------+------------------+
>>> df.groupBy('mobile').sum().show(5,False)
+-------+------------+--------+---------------+-----------+
|mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|
+-------+------------+--------+---------------+-----------+
|MI |28 |241 |81.5 |11 |
|Oppo |20 |199 |72.5 |10 |
|Samsung|25 |172 |52.0 |11 |
|Vivo |21 |180 |57.0 |9 |
|Apple |24 |214 |77.0 |19 |
+-------+------------+--------+---------------+-----------+
>>> df.groupBy('mobile').max().show(5,False)
+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI |5 |42 |23.0 |5 |
|Oppo |4 |42 |23.0 |2 |
|Samsung|5 |37 |23.0 |5 |
|Vivo |5 |37 |23.0 |5 |
|Apple |4 |37 |16.5 |5 |
+-------+------------+--------+---------------+-----------+
>>> df.groupBy('mobile').max().show(3,False)
+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI |5 |42 |23.0 |5 |
|Oppo |4 |42 |23.0 |2 |
|Samsung|5 |37 |23.0 |5 |
+-------+------------+--------+---------------+-----------+
only showing top 3 rows
>>> df.groupBy('mobile').min().show(5,False)
+-------+------------+--------+---------------+-----------+
|mobile |min(ratings)|min(age)|min(experience)|min(family)|
+-------+------------+--------+---------------+-----------+
|MI |1 |27 |2.5 |0 |
|Oppo |2 |22 |6.0 |0 |
|Samsung|2 |22 |2.5 |0 |
|Vivo |3 |32 |6.0 |0 |
|Apple |3 |22 |2.5 |0 |
+-------+------------+--------+---------------+-----------+
- 聚合
>>> df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)
+-------+---------------+
|mobile |sum(experience)|
+-------+---------------+
|MI |81.5 |
|Oppo |72.5 |
|Samsung|52.0 |
|Vivo |57.0 |
|Apple |77.0 |
+-------+---------------+
参考资料

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
hadoop项目开发案例举例
大数据Hadoop应用开发技术正可谓如火如荼推进中,以为大数据已经不仅仅是局限在互联网领域,而是已经被上升到了国家战略的高度层面。大数据正在深刻影响和改变我们的日常生活和工作方式。 Hadoop应用开发太过偏底层,难度之大真不是我们一般人所能够理解的。有的人会说,不都是倒腾代码吗?有什么难的!如果真是这样想,那就真的完蛋了。做hadoop底层的开发,真不是一般人和一般的企业就能够去做的。问个超级简单的问题,你知道的网络公司多,还是做大数据hadoop开发的公司多?估计没几个人知道做大数据hadoop开发的公司有哪些吧? Hadoop起源于国外,所以说国内的Hadoop应用开发起步是落后国外很多的。这也就导致了Hadoop开发中的很多游戏规则我们也只能遵从别人已经制定好的玩了。虽然说国内的Hadoop开发起步晚,但总算是有一些企业在做的。比如国产手机界扛把子的华为,还一直默默的在做底层开发的大快! 当然,国内做Hadoop开发的不是只有这两家了。国内做Hadoop开发的公司以二次包装为主,做Hadoop原生态开发的至少我知道的也就是上面刚才说的大快搜索是在做的。感兴趣的可以搜索找一下国产...
-
下一篇
[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)1数据演进
在早期员工将数据输入系统,数据点非常有限,只占用少数几个字段。然后是互联网,每个人都可以轻松获取信息。现在,用户可输入并生成自己的数据。随着互联网用户数量呈指数级增长,用户创造的高数据增长率。例如:登录/注册表单允许用户填写自己的详细信息,在各种社交平台上上传照片和视频。这导致了巨大的数据生成以及快速处理数据量的且可扩展的框架的需求。 数据生成 设备都捕获数据,如汽车,建筑物,手机,手表,飞行引擎。 数据处理也从串行转向并行处理。 Spark Spark是处理海量数据集的框架,具有高速并行处理功能。它最初是加州大学伯克利分校AMPLabin 2009的研究项目,于2010年初开源。 2016年,Spark发布了针对深度学习的TensorFrames。 Spark底层使用RDD(弹性分布式数据集Resilient Distributed Dataset)的数据结构。它能够在执行过程中重新创建任何时间点。 RDD使用最后一个创建的RDD,并且总是能够在出现任何错误时进行重构。它们是不可变的,因为原始RDD还在。由于Spark基于分布式框架,因此它适用于master和worker节点设置。执...
相关文章
文章评论
共有0条评论来说两句吧...