首页 文章 精选 留言 我的

精选列表

搜索[系统],共10000篇文章
优秀的个人博客,低调大师

[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理1

本章介绍数据处理。数据处理是执行Machine Learning所需的关键步骤,因为我们需要清理,过滤,合并和转换我们的所需数据形式。 快速入门 读取 >>> from pyspark.sql import SparkSession >>> spark=SparkSession.builder.appName('data_processing').getOrCreate() >>> df=spark.read.csv('sample_data.csv',inferSchema=True, header=True) >>> df.columns ['ratings', 'age', 'experience', 'family', 'mobile'] >>> len(df.columns) 5 >>> df.count() 33 >>> df.printSchema() root |-- ratings: integer (nullable = true) |-- age: integer (nullable = true) |-- experience: double (nullable = true) |-- family: integer (nullable = true) |-- mobile: string (nullable = true) >>> df.show(3) +-------+---+----------+------+-------+ |ratings|age|experience|family| mobile| +-------+---+----------+------+-------+ | 3| 32| 9.0| 3| Vivo| | 3| 27| 13.0| 3| Apple| | 4| 22| 2.5| 0|Samsung| +-------+---+----------+------+-------+ only showing top 3 rows >>> df.select('age','mobile').show(5) +---+-------+ |age| mobile| +---+-------+ | 32| Vivo| | 27| Apple| | 22|Samsung| | 37| Apple| | 27| MI| +---+-------+ only showing top 5 rows >>> df.describe().show() +-------+------------------+------------------+------------------+------------------+------+ |summary| ratings| age| experience| family|mobile| +-------+------------------+------------------+------------------+------------------+------+ | count| 33| 33| 33| 33| 33| | mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181| null| | stddev|1.1188806636071336| 6.18527087180309| 6.770731351213326|1.8448330794164254| null| | min| 1| 22| 2.5| 0| Apple| | max| 5| 42| 23.0| 5| Vivo| +-------+------------------+------------------+------------------+------------------+------+ 添加列 >>> df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False) +-------+---+----------+------+-------+----------------+ |ratings|age|experience|family|mobile |age_after_10_yrs| +-------+---+----------+------+-------+----------------+ |3 |32 |9.0 |3 |Vivo |42 | |3 |27 |13.0 |3 |Apple |37 | |4 |22 |2.5 |0 |Samsung|32 | |4 |37 |16.5 |4 |Apple |47 | |5 |27 |9.0 |1 |MI |37 | |4 |27 |9.0 |0 |Oppo |37 | |5 |37 |23.0 |5 |Vivo |47 | |5 |37 |23.0 |5 |Samsung|47 | |3 |22 |2.5 |0 |Apple |32 | |3 |27 |6.0 |0 |MI |37 | +-------+---+----------+------+-------+----------------+ only showing top 10 rows >>> from pyspark.sql.types import StringType,DoubleType >>> df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False) +-------+---+----------+------+-------+----------+ |ratings|age|experience|family|mobile |age_double| +-------+---+----------+------+-------+----------+ |3 |32 |9.0 |3 |Vivo |32.0 | |3 |27 |13.0 |3 |Apple |27.0 | |4 |22 |2.5 |0 |Samsung|22.0 | |4 |37 |16.5 |4 |Apple |37.0 | |5 |27 |9.0 |1 |MI |27.0 | |4 |27 |9.0 |0 |Oppo |27.0 | |5 |37 |23.0 |5 |Vivo |37.0 | |5 |37 |23.0 |5 |Samsung|37.0 | |3 |22 |2.5 |0 |Apple |22.0 | |3 |27 |6.0 |0 |MI |27.0 | +-------+---+----------+------+-------+----------+ only showing top 10 rows 上面的False表示超过20个字符也不会截断。 数据过滤 >>> df.filter(df['mobile']=='Vivo').show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ | 3| 32| 9.0| 3| Vivo| | 5| 37| 23.0| 5| Vivo| | 4| 37| 6.0| 0| Vivo| | 5| 37| 13.0| 1| Vivo| | 4| 37| 6.0| 0| Vivo| +-------+---+----------+------+------+ >>> df.filter(df['mobile']=='Vivo').select('age','ratings', 'mobile').show() +---+-------+------+ |age|ratings|mobile| +---+-------+------+ | 32| 3| Vivo| | 37| 5| Vivo| | 37| 4| Vivo| | 37| 5| Vivo| | 37| 4| Vivo| +---+-------+------+ >>> df.filter(df['mobile']=='Vivo').filter(df['experience']>10).show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ | 5| 37| 23.0| 5| Vivo| | 5| 37| 13.0| 1| Vivo| +-------+---+----------+------+------+ >>> df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show() +-------+---+----------+------+------+ |ratings|age|experience|family|mobile| +-------+---+----------+------+------+ | 5| 37| 23.0| 5| Vivo| | 5| 37| 13.0| 1| Vivo| +-------+---+----------+------+------+ 唯一值 >>> df.select('mobile').distinct().show() +-------+ | mobile| +-------+ | MI| | Oppo| |Samsung| | Vivo| | Apple| +-------+ >>> df.select('mobile').distinct().count() 5 分组和排序 >>> df.groupBy('mobile').count().show(5,False) +-------+-----+ |mobile |count| +-------+-----+ |MI |8 | |Oppo |7 | |Samsung|6 | |Vivo |5 | |Apple |7 | +-------+-----+ >>> df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False) +-------+-----+ |mobile |count| +-------+-----+ |MI |8 | |Oppo |7 | |Apple |7 | |Samsung|6 | |Vivo |5 | +-------+-----+ >>> df.groupBy('mobile').mean().show(5,False) +-------+------------------+------------------+------------------+------------------+ |mobile |avg(ratings) |avg(age) |avg(experience) |avg(family) | +-------+------------------+------------------+------------------+------------------+ |MI |3.5 |30.125 |10.1875 |1.375 | |Oppo |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286| |Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333| |Vivo |4.2 |36.0 |11.4 |1.8 | |Apple |3.4285714285714284|30.571428571428573|11.0 |2.7142857142857144| +-------+------------------+------------------+------------------+------------------+ >>> df.groupBy('mobile').sum().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)| +-------+------------+--------+---------------+-----------+ |MI |28 |241 |81.5 |11 | |Oppo |20 |199 |72.5 |10 | |Samsung|25 |172 |52.0 |11 | |Vivo |21 |180 |57.0 |9 | |Apple |24 |214 |77.0 |19 | +-------+------------+--------+---------------+-----------+ >>> df.groupBy('mobile').max().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |max(ratings)|max(age)|max(experience)|max(family)| +-------+------------+--------+---------------+-----------+ |MI |5 |42 |23.0 |5 | |Oppo |4 |42 |23.0 |2 | |Samsung|5 |37 |23.0 |5 | |Vivo |5 |37 |23.0 |5 | |Apple |4 |37 |16.5 |5 | +-------+------------+--------+---------------+-----------+ >>> df.groupBy('mobile').max().show(3,False) +-------+------------+--------+---------------+-----------+ |mobile |max(ratings)|max(age)|max(experience)|max(family)| +-------+------------+--------+---------------+-----------+ |MI |5 |42 |23.0 |5 | |Oppo |4 |42 |23.0 |2 | |Samsung|5 |37 |23.0 |5 | +-------+------------+--------+---------------+-----------+ only showing top 3 rows >>> df.groupBy('mobile').min().show(5,False) +-------+------------+--------+---------------+-----------+ |mobile |min(ratings)|min(age)|min(experience)|min(family)| +-------+------------+--------+---------------+-----------+ |MI |1 |27 |2.5 |0 | |Oppo |2 |22 |6.0 |0 | |Samsung|2 |22 |2.5 |0 | |Vivo |3 |32 |6.0 |0 | |Apple |3 |22 |2.5 |0 | +-------+------------+--------+---------------+-----------+ 聚合 >>> df.groupBy('mobile').agg({'experience':'sum'}).show(5,False) +-------+---------------+ |mobile |sum(experience)| +-------+---------------+ |MI |81.5 | |Oppo |72.5 | |Samsung|52.0 | |Vivo |57.0 | |Apple |77.0 | +-------+---------------+ 参考资料 python测试开发项目实战-目录 python工具书籍下载-持续更新 python 3.7极速入门教程 - 目录 原文地址 本文涉及的python测试开发库 谢谢点赞! [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

优秀的个人博客,低调大师

[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理2

用户定义函数(UDF:User-Defined Functions) UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。 传统的Python函数 >>> from pyspark.sql.functions import udf >>> def price_range(brand): ... prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'} ... return prices.get('test',"Low Price") ... >>> brand_udf=udf(price_range,StringType()) >>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False) +-------+---+----------+------+-------+-----------+ |ratings|age|experience|family|mobile |price_range| +-------+---+----------+------+-------+-----------+ |3 |32 |9.0 |3 |Vivo |Low Price | |3 |27 |13.0 |3 |Apple |Low Price | |4 |22 |2.5 |0 |Samsung|Low Price | |4 |37 |16.5 |4 |Apple |Low Price | |5 |27 |9.0 |1 |MI |Low Price | |4 |27 |9.0 |0 |Oppo |Low Price | |5 |37 |23.0 |5 |Vivo |Low Price | |5 |37 |23.0 |5 |Samsung|Low Price | |3 |22 |2.5 |0 |Apple |Low Price | |3 |27 |6.0 |0 |MI |Low Price | +-------+---+----------+------+-------+-----------+ only showing top 10 rows >>> Lambda函数 >>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) >>> df.withColumn("age_group", age_udf(df.age)).show(10,False) +-------+---+----------+------+-------+---------+ |ratings|age|experience|family|mobile |age_group| +-------+---+----------+------+-------+---------+ |3 |32 |9.0 |3 |Vivo |senior | |3 |27 |13.0 |3 |Apple |young | |4 |22 |2.5 |0 |Samsung|young | |4 |37 |16.5 |4 |Apple |senior | |5 |27 |9.0 |1 |MI |young | |4 |27 |9.0 |0 |Oppo |young | |5 |37 |23.0 |5 |Vivo |senior | |5 |37 |23.0 |5 |Samsung|senior | |3 |22 |2.5 |0 |Apple |young | |3 |27 |6.0 |0 |MI |young | +-------+---+----------+------+-------+---------+ only showing top 10 rows PandasUDF(矢量化UDF) 有两种类型的Pandas UDF:Scalar和GroupedMap。 Pandas UDF与使用基本UDf非常相似。我们必须首先从PySpark导入pandas_udf并将其应用于要转换的任何特定列。 >>> from pyspark.sql.functions import pandas_udf >>> def remaining_yrs(age): ... return (100-age) ... >>> from pyspark.sql.types import IntegerType >>> length_udf = pandas_udf(remaining_yrs, IntegerType()) >>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+--------+ |ratings|age|experience|family|mobile |yrs_left| +-------+---+----------+------+-------+--------+ |3 |32 |9.0 |3 |Vivo |68 | |3 |27 |13.0 |3 |Apple |73 | |4 |22 |2.5 |0 |Samsung|78 | |4 |37 |16.5 |4 |Apple |63 | |5 |27 |9.0 |1 |MI |73 | |4 |27 |9.0 |0 |Oppo |73 | |5 |37 |23.0 |5 |Vivo |63 | |5 |37 |23.0 |5 |Samsung|63 | |3 |22 |2.5 |0 |Apple |78 | |3 |27 |6.0 |0 |MI |73 | +-------+---+----------+------+-------+--------+ only showing top 10 rows PandasUDF(多列) >>> def prod(rating,exp): ... return rating*exp ... >>> prod_udf = pandas_udf(prod, DoubleType()) >>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+-------+ |ratings|age|experience|family|mobile |product| +-------+---+----------+------+-------+-------+ |3 |32 |9.0 |3 |Vivo |27.0 | |3 |27 |13.0 |3 |Apple |39.0 | |4 |22 |2.5 |0 |Samsung|10.0 | |4 |37 |16.5 |4 |Apple |66.0 | |5 |27 |9.0 |1 |MI |45.0 | |4 |27 |9.0 |0 |Oppo |36.0 | |5 |37 |23.0 |5 |Vivo |115.0 | |5 |37 |23.0 |5 |Samsung|115.0 | |3 |22 |2.5 |0 |Apple |7.5 | |3 |27 |6.0 |0 |MI |18.0 | +-------+---+----------+------+-------+-------+ only showing top 10 rows 删除重复值 >>> df.count() 33 >>> df=df.dropDuplicates() >>> df.count() 26 删除列 >>> df_new=df.drop('mobile') >>> df_new.show() +-------+---+----------+------+ |ratings|age|experience|family| +-------+---+----------+------+ | 3| 32| 9.0| 3| | 4| 22| 2.5| 0| | 5| 27| 6.0| 0| | 4| 22| 6.0| 1| | 3| 27| 6.0| 0| | 2| 32| 16.5| 2| | 4| 27| 9.0| 0| | 2| 27| 9.0| 2| | 3| 37| 16.5| 5| | 4| 27| 6.0| 1| | 5| 37| 23.0| 5| | 2| 27| 6.0| 2| | 4| 37| 6.0| 0| | 5| 37| 23.0| 5| | 4| 37| 9.0| 2| | 5| 37| 13.0| 1| | 5| 27| 2.5| 0| | 3| 42| 23.0| 5| | 5| 22| 2.5| 0| | 1| 37| 23.0| 5| +-------+---+----------+------+ only showing top 20 rows 参考资料 python测试开发项目实战-目录 python工具书籍下载-持续更新 python 3.7极速入门教程 - 目录 原文地址 本文涉及的python测试开发库 谢谢点赞! [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html 写数据 CSV 如果我们想以原始csv格式将其保存为单个文件,我们可以在spark中使用coalesce函数。 >>> write_uri = '/home/andrew/test.csv' >>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri) Parquet 如果数据集很大且涉及很多列,我们可以选择对其进行压缩并将其转换为Parquet文件格式。它减少了数据的整体大小并在处理数据时优化了性能,因为它可以处理所需列的子集而不是整个数据。我们可以轻松地将数据帧转换并保存为Parquet格式。 注意完整的数据集以及代码可以在本书的GitHub存储库中进行参考,并在onSpark 2.3及更高版本上执行最佳。

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册