在Spark数据框上实现pythonic统计功能

问题描述

我在spark数据框中有非常大的数据集,这些数据集分布在各个节点上。 我可以使用火花库mean进行简单的统计,例如stdevskewnesskurtosispyspark.sql.functions等。

如果我想使用Jarque-Bera(JB)或Shapiro-Wilk(SW)等高级统计测试,我将使用scipy之类的python库,因为标准的apache pyspark库没有它们。但是为了做到这一点,我必须将spark数据帧转换为pandas,这意味着像这样将数据强制进入主节点:

import scipy.stats as stats
pandas_df=spark_df.toPandas()
JBtest=stats.jarque_bera(pandas_df)
SWtest=stats.shapiro(pandas_df)

我有多个功能,每个功能ID对应一个我要在其上执行测试统计信息的数据集。

我的问题是:

当数据仍分布在节点之间时,是否可以将这些pythonic函数应用于spark数据框,还是需要在spark中创建自己的JB / SW测试统计函数

谢谢您的宝贵见解

解决方法

您应该能够定义一个包装了Pandas函数(https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)的矢量化用户定义函数,如下所示:

from pyspark.sql.functions import pandas_udf,PandasUDFType
import scipy.stats as stats

@pandas_udf('double',PandasUDFType.SCALAR)

def vector_jarque_bera(x):
    return stats.jarque_bera(x)

# test:
spark_df.withColumn('y',vector_jarque_bera(df['x']))

请注意,向量化函数列将一列作为其参数并返回一列。

(Nb。@pandas_udf装饰器将其下面定义的Pandas函数转换为向量化函数。返回向量的每个元素本身都是标量,这就是为什么PandasUDFType.SCALAR自变量是通过。