您现在的位置是:首页 > 文章详情

[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理2

日期:2019-01-27点击:310

图片.png

用户定义函数(UDF:User-Defined Functions)

UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。

  • 传统的Python函数
 >>> from pyspark.sql.functions import udf >>> def price_range(brand): ... prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'} ... return prices.get('test',"Low Price") ... >>> brand_udf=udf(price_range,StringType()) >>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False) +-------+---+----------+------+-------+-----------+ |ratings|age|experience|family|mobile |price_range| +-------+---+----------+------+-------+-----------+ |3 |32 |9.0 |3 |Vivo |Low Price | |3 |27 |13.0 |3 |Apple |Low Price | |4 |22 |2.5 |0 |Samsung|Low Price | |4 |37 |16.5 |4 |Apple |Low Price | |5 |27 |9.0 |1 |MI |Low Price | |4 |27 |9.0 |0 |Oppo |Low Price | |5 |37 |23.0 |5 |Vivo |Low Price | |5 |37 |23.0 |5 |Samsung|Low Price | |3 |22 |2.5 |0 |Apple |Low Price | |3 |27 |6.0 |0 |MI |Low Price | +-------+---+----------+------+-------+-----------+ only showing top 10 rows >>> 
  • Lambda函数
 >>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) >>> df.withColumn("age_group", age_udf(df.age)).show(10,False) +-------+---+----------+------+-------+---------+ |ratings|age|experience|family|mobile |age_group| +-------+---+----------+------+-------+---------+ |3 |32 |9.0 |3 |Vivo |senior | |3 |27 |13.0 |3 |Apple |young | |4 |22 |2.5 |0 |Samsung|young | |4 |37 |16.5 |4 |Apple |senior | |5 |27 |9.0 |1 |MI |young | |4 |27 |9.0 |0 |Oppo |young | |5 |37 |23.0 |5 |Vivo |senior | |5 |37 |23.0 |5 |Samsung|senior | |3 |22 |2.5 |0 |Apple |young | |3 |27 |6.0 |0 |MI |young | +-------+---+----------+------+-------+---------+ only showing top 10 rows

图片.png

  • PandasUDF(矢量化UDF)

有两种类型的Pandas UDF:Scalar和GroupedMap。

Pandas UDF与使用基本UDf非常相似。我们必须首先从PySpark导入pandas_udf并将其应用于要转换的任何特定列。

 >>> from pyspark.sql.functions import pandas_udf >>> def remaining_yrs(age): ... return (100-age) ... >>> from pyspark.sql.types import IntegerType >>> length_udf = pandas_udf(remaining_yrs, IntegerType()) >>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+--------+ |ratings|age|experience|family|mobile |yrs_left| +-------+---+----------+------+-------+--------+ |3 |32 |9.0 |3 |Vivo |68 | |3 |27 |13.0 |3 |Apple |73 | |4 |22 |2.5 |0 |Samsung|78 | |4 |37 |16.5 |4 |Apple |63 | |5 |27 |9.0 |1 |MI |73 | |4 |27 |9.0 |0 |Oppo |73 | |5 |37 |23.0 |5 |Vivo |63 | |5 |37 |23.0 |5 |Samsung|63 | |3 |22 |2.5 |0 |Apple |78 | |3 |27 |6.0 |0 |MI |73 | +-------+---+----------+------+-------+--------+ only showing top 10 rows 
  • PandasUDF(多列)
 >>> def prod(rating,exp): ... return rating*exp ... >>> prod_udf = pandas_udf(prod, DoubleType()) >>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+-------+ |ratings|age|experience|family|mobile |product| +-------+---+----------+------+-------+-------+ |3 |32 |9.0 |3 |Vivo |27.0 | |3 |27 |13.0 |3 |Apple |39.0 | |4 |22 |2.5 |0 |Samsung|10.0 | |4 |37 |16.5 |4 |Apple |66.0 | |5 |27 |9.0 |1 |MI |45.0 | |4 |27 |9.0 |0 |Oppo |36.0 | |5 |37 |23.0 |5 |Vivo |115.0 | |5 |37 |23.0 |5 |Samsung|115.0 | |3 |22 |2.5 |0 |Apple |7.5 | |3 |27 |6.0 |0 |MI |18.0 | +-------+---+----------+------+-------+-------+ only showing top 10 rows 

删除重复值

 >>> df.count() 33 >>> df=df.dropDuplicates() >>> df.count() 26

删除列

 >>> df_new=df.drop('mobile') >>> df_new.show() +-------+---+----------+------+ |ratings|age|experience|family| +-------+---+----------+------+ | 3| 32| 9.0| 3| | 4| 22| 2.5| 0| | 5| 27| 6.0| 0| | 4| 22| 6.0| 1| | 3| 27| 6.0| 0| | 2| 32| 16.5| 2| | 4| 27| 9.0| 0| | 2| 27| 9.0| 2| | 3| 37| 16.5| 5| | 4| 27| 6.0| 1| | 5| 37| 23.0| 5| | 2| 27| 6.0| 2| | 4| 37| 6.0| 0| | 5| 37| 23.0| 5| | 4| 37| 9.0| 2| | 5| 37| 13.0| 1| | 5| 27| 2.5| 0| | 3| 42| 23.0| 5| | 5| 22| 2.5| 0| | 1| 37| 23.0| 5| +-------+---+----------+------+ only showing top 20 rows

参考资料

写数据

  • CSV

如果我们想以原始csv格式将其保存为单个文件,我们可以在spark中使用coalesce函数。

 >>> write_uri = '/home/andrew/test.csv' >>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
  • Parquet

如果数据集很大且涉及很多列,我们可以选择对其进行压缩并将其转换为Parquet文件格式。它减少了数据的整体大小并在处理数据时优化了性能,因为它可以处理所需列的子集而不是整个数据。
我们可以轻松地将数据帧转换并保存为Parquet格式。

注意完整的数据集以及代码可以在本书的GitHub存储库中进行参考,并在onSpark 2.3及更高版本上执行最佳。

图片.png

原文链接:https://yq.aliyun.com/articles/689105
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章