Pyspark,如何使用udf计算泊松分布?

问题描述

我有一个看起来像这样的数据框:

df_schema = StructType([StructField("date",StringType(),True),\
                              StructField("col1",FloatType(),\
                             StructField("col2",True)])
df_data = [('2020-08-01',0.09,0.8),\
                 ('2020-08-02',0.0483,0.8)]
rdd = sc.parallelize(df_data)
df = sqlContext.createDataFrame(df_data,df_schema)
df = df.withColumn("date",to_date("date",'yyyy-MM-dd'))
df.show() 

+----------+------+----+
|      date|  col1|col2|
+----------+------+----+
|2020-08-01|  0.09| 0.8|
|2020-08-02|0.0483| 0.8|
+----------+------+----+

我想使用col1和col2计算泊松CDF。

我们可以轻松地从scipy.stats中使用熊猫数据框中的poisson导入,但是我不知道如何处理pyspark。

prob = poisson.cdf(x,mu),其中x = col1,在本例中为mu = col2。

ATTEMPT 1:

from scipy.stats import poisson
from pyspark.sql.functions import udf,col
def poisson_calc(a,b):
    return poisson.cdf(a,b,axis=1)

poisson_calc = udf(poisson_calc,FloatType())

df_new = df.select(
  poisson_calc(col('col1'),col('col2')).alias("want") )

df_new.show()

给我一​​个错误:TypeError:_parse_args()得到了意外的关键字参数'axis'

解决方法

我发现您的尝试存在一些问题。

  • 您将udf命名为基础函数。令人惊讶的是,这实际上并不是问题,但我会避免。
  • axis没有scipy.stats.poisson.cdf关键字参数
  • 您必须将输出显式转换为float,否则您将遇到this error

解决所有问题,以下方法应该起作用:

from scipy.stats import poisson
from pyspark.sql.functions import udf,col

def poisson_calc(a,b):
    return float(poisson.cdf(a,b))

poisson_calc_udf = udf(poisson_calc,FloatType())

df_new = df.select(
  poisson_calc_udf(col('col1'),col('col2')).alias("want") 
)

df_new.show()
#+----------+
#|      want|
#+----------+
#|0.44932896|
#|0.44932896|
#+----------+