问题描述
df_schema = StructType([StructField("date",StringType(),True),\
StructField("col1",FloatType(),\
StructField("col2",True)])
df_data = [('2020-08-01',0.09,0.8),\
('2020-08-02',0.0483,0.8)]
rdd = sc.parallelize(df_data)
df = sqlContext.createDataFrame(df_data,df_schema)
df = df.withColumn("date",to_date("date",'yyyy-MM-dd'))
df.show()
+----------+------+----+
| date| col1|col2|
+----------+------+----+
|2020-08-01| 0.09| 0.8|
|2020-08-02|0.0483| 0.8|
+----------+------+----+
我想使用col1和col2计算泊松CDF。
我们可以轻松地从scipy.stats中使用熊猫数据框中的poisson导入,但是我不知道如何处理pyspark。
prob = poisson.cdf(x,mu),其中x = col1,在本例中为mu = col2。
ATTEMPT 1:
from scipy.stats import poisson
from pyspark.sql.functions import udf,col
def poisson_calc(a,b):
return poisson.cdf(a,b,axis=1)
poisson_calc = udf(poisson_calc,FloatType())
df_new = df.select(
poisson_calc(col('col1'),col('col2')).alias("want") )
df_new.show()
给我一个错误:TypeError:_parse_args()得到了意外的关键字参数'axis'
解决方法
我发现您的尝试存在一些问题。
- 您将
udf
命名为基础函数。令人惊讶的是,这实际上并不是问题,但我会避免。 -
axis
没有scipy.stats.poisson.cdf
关键字参数 - 您必须将输出显式转换为
float
,否则您将遇到this error
解决所有问题,以下方法应该起作用:
from scipy.stats import poisson
from pyspark.sql.functions import udf,col
def poisson_calc(a,b):
return float(poisson.cdf(a,b))
poisson_calc_udf = udf(poisson_calc,FloatType())
df_new = df.select(
poisson_calc_udf(col('col1'),col('col2')).alias("want")
)
df_new.show()
#+----------+
#| want|
#+----------+
#|0.44932896|
#|0.44932896|
#+----------+