[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理2
用户定义函数(UDF:User-Defined Functions)
UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。
- 传统的Python函数
>>> from pyspark.sql.functions import udf >>> def price_range(brand): ... prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'} ... return prices.get('test',"Low Price") ... >>> brand_udf=udf(price_range,StringType()) >>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False) +-------+---+----------+------+-------+-----------+ |ratings|age|experience|family|mobile |price_range| +-------+---+----------+------+-------+-----------+ |3 |32 |9.0 |3 |Vivo |Low Price | |3 |27 |13.0 |3 |Apple |Low Price | |4 |22 |2.5 |0 |Samsung|Low Price | |4 |37 |16.5 |4 |Apple |Low Price | |5 |27 |9.0 |1 |MI |Low Price | |4 |27 |9.0 |0 |Oppo |Low Price | |5 |37 |23.0 |5 |Vivo |Low Price | |5 |37 |23.0 |5 |Samsung|Low Price | |3 |22 |2.5 |0 |Apple |Low Price | |3 |27 |6.0 |0 |MI |Low Price | +-------+---+----------+------+-------+-----------+ only showing top 10 rows >>>
- Lambda函数
>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) >>> df.withColumn("age_group", age_udf(df.age)).show(10,False) +-------+---+----------+------+-------+---------+ |ratings|age|experience|family|mobile |age_group| +-------+---+----------+------+-------+---------+ |3 |32 |9.0 |3 |Vivo |senior | |3 |27 |13.0 |3 |Apple |young | |4 |22 |2.5 |0 |Samsung|young | |4 |37 |16.5 |4 |Apple |senior | |5 |27 |9.0 |1 |MI |young | |4 |27 |9.0 |0 |Oppo |young | |5 |37 |23.0 |5 |Vivo |senior | |5 |37 |23.0 |5 |Samsung|senior | |3 |22 |2.5 |0 |Apple |young | |3 |27 |6.0 |0 |MI |young | +-------+---+----------+------+-------+---------+ only showing top 10 rows
- PandasUDF(矢量化UDF)
有两种类型的Pandas UDF:Scalar和GroupedMap。
Pandas UDF与使用基本UDf非常相似。我们必须首先从PySpark导入pandas_udf并将其应用于要转换的任何特定列。
>>> from pyspark.sql.functions import pandas_udf >>> def remaining_yrs(age): ... return (100-age) ... >>> from pyspark.sql.types import IntegerType >>> length_udf = pandas_udf(remaining_yrs, IntegerType()) >>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+--------+ |ratings|age|experience|family|mobile |yrs_left| +-------+---+----------+------+-------+--------+ |3 |32 |9.0 |3 |Vivo |68 | |3 |27 |13.0 |3 |Apple |73 | |4 |22 |2.5 |0 |Samsung|78 | |4 |37 |16.5 |4 |Apple |63 | |5 |27 |9.0 |1 |MI |73 | |4 |27 |9.0 |0 |Oppo |73 | |5 |37 |23.0 |5 |Vivo |63 | |5 |37 |23.0 |5 |Samsung|63 | |3 |22 |2.5 |0 |Apple |78 | |3 |27 |6.0 |0 |MI |73 | +-------+---+----------+------+-------+--------+ only showing top 10 rows
- PandasUDF(多列)
>>> def prod(rating,exp): ... return rating*exp ... >>> prod_udf = pandas_udf(prod, DoubleType()) >>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+-------+ |ratings|age|experience|family|mobile |product| +-------+---+----------+------+-------+-------+ |3 |32 |9.0 |3 |Vivo |27.0 | |3 |27 |13.0 |3 |Apple |39.0 | |4 |22 |2.5 |0 |Samsung|10.0 | |4 |37 |16.5 |4 |Apple |66.0 | |5 |27 |9.0 |1 |MI |45.0 | |4 |27 |9.0 |0 |Oppo |36.0 | |5 |37 |23.0 |5 |Vivo |115.0 | |5 |37 |23.0 |5 |Samsung|115.0 | |3 |22 |2.5 |0 |Apple |7.5 | |3 |27 |6.0 |0 |MI |18.0 | +-------+---+----------+------+-------+-------+ only showing top 10 rows
删除重复值
>>> df.count() 33 >>> df=df.dropDuplicates() >>> df.count() 26
删除列
>>> df_new=df.drop('mobile') >>> df_new.show() +-------+---+----------+------+ |ratings|age|experience|family| +-------+---+----------+------+ | 3| 32| 9.0| 3| | 4| 22| 2.5| 0| | 5| 27| 6.0| 0| | 4| 22| 6.0| 1| | 3| 27| 6.0| 0| | 2| 32| 16.5| 2| | 4| 27| 9.0| 0| | 2| 27| 9.0| 2| | 3| 37| 16.5| 5| | 4| 27| 6.0| 1| | 5| 37| 23.0| 5| | 2| 27| 6.0| 2| | 4| 37| 6.0| 0| | 5| 37| 23.0| 5| | 4| 37| 9.0| 2| | 5| 37| 13.0| 1| | 5| 27| 2.5| 0| | 3| 42| 23.0| 5| | 5| 22| 2.5| 0| | 1| 37| 23.0| 5| +-------+---+----------+------+ only showing top 20 rows
参考资料
- python测试开发项目实战-目录
- python工具书籍下载-持续更新
- python 3.7极速入门教程 - 目录
- 原文地址
- 本文涉及的python测试开发库 谢谢点赞!
- [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md
- http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
写数据
- CSV
如果我们想以原始csv格式将其保存为单个文件,我们可以在spark中使用coalesce函数。
>>> write_uri = '/home/andrew/test.csv' >>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
- Parquet
如果数据集很大且涉及很多列,我们可以选择对其进行压缩并将其转换为Parquet文件格式。它减少了数据的整体大小并在处理数据时优化了性能,因为它可以处理所需列的子集而不是整个数据。
我们可以轻松地将数据帧转换并保存为Parquet格式。
注意完整的数据集以及代码可以在本书的GitHub存储库中进行参考,并在onSpark 2.3及更高版本上执行最佳。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
[雪峰磁针石博客]2019-Python最佳数据科学工具库
说明 以下库都可以在python测试开发库中找到,github地址:https://github.com/china-testing/python-api-tesing 相关书籍:https://china-testing.github.io/python_books.html 核心库 NumPy Numerical Python的缩写,专为数学运算而设计。 支持用于复杂算术运算的多维数组和向量。还具有丰富的函数集,可以对支持的数据类型执行代数运算。 能与其他编程语言(如C / C ++,FORTRAN和数据库管理系统)的互操作。 而且,由于提供的函数是预编译的,效率高。 SciPy的 基于NumPy,提供集成,回归和概率等高级操作。 子模块组织有层次结构,手册很好。 Pandas Python Data Analysis Library可根据需要帮助组织各种参数的数据。 各种内置数据类型(如serie,frame和panels)使Pandas成为数据科学家中最受欢迎的库。 帧表格格式允许对数据进行类似数据库的添加/删除操作,分组很容易。 此外,Pandas提供了三维面板数据结构,有助...
- 下一篇
可应用于实际的14个NLP突破性研究成果(三)
8.用于语义角色标注的语言学信息自我注意力方法,作者:EMMA STRUBELL,PATRICK VERGA,DANIEL ANDOR,DAVID WEISS,ANDREW MCCALLUM 论文摘要 当前最先进的语义角色标记(SRL)使用深度神经网络,但没有明确的语言特征。之前的工作表明,抽象语法树可以显著改善SRL,从而提高模型准确性。在这项研究中,我们提出了语言学的自我关注(LISA):该神经网络模型将 multi-head self-attention 与多任务学习相结合,包括依赖解析、词性标注、谓词检测和语义角色标记。与先前需要大量预处理来准备语言特征的模型不同,LISA 可以仅使用原始的 token 对序列进行一次编码,来同时执行多个预测任务。此外,如果已经有高质量的语法分析,则可以在测试时加入,而无需重新训练我们的SRL
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8编译安装MySQL8.0.19
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Hadoop3单机部署,实现最简伪集群
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果