从pyspark中的字符串加载jalali日期

问题描述

我需要从字符串中加载jalali日期,然后将其作为gregorian日期字符串返回。我正在使用以下代码

def jalali_to_gregorian(col,format=None):
    if not format:
        format = "%Y/%m/d"
    gre = jdatetime.datetime.strptime(col,format=format).togregorian()
    return gre.strftime(format=format)

# register the function
spark.udf.register("jalali_to_gregorian",jalali_to_gregorian,StringType())
# load the date and show it:)
df = df.withColumn("financial_date",jalali_to_gregorian(df.PersianCreateDate))
df.select(['PersianCreateDate','financial_date']).show()

它向我抛出ValueError: time data 'Column<PersianCreateDate>' does not match format '%Y/%m/%d'错误。 列中的字符串是匹配项,我已经对其进行了测试。这是spark如何将列值发送到我的函数的问题。反正解决了吗?

进行测试:

df=spark.createDataFrame([('1399/01/02',),('1399/01/01',)],['jalali'])
df = df.withColumn("gre",jalali_to_gregorian(df.jalali))
df.show()

应该导致

+----------+----------+
|    jalali|       gre|
+----------+----------+
|1399/01/02|2020/03/20|
|1399/01/01|2020/03/21|
+----------+----------+

相反,我被惹上了:

Fail to execute line 2: df = df.withColumn("financial_date",jalali_to_gregorian(df.jalali))
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-6468469233020961307.py",line 375,in <module>
exec(code,_zcUserQueryNameSpace)
File "<stdin>",line 2,in <module>
File "<stdin>",line 7,in jalali_to_gregorian
File "/usr/local/lib/python2.7/dist-packages/jdatetime/__init__.py",line 929,in strptime
(date_string,format))
ValueError: time data 'Column<jalali>' does not match format '%Y/%m/%d''%Y/%m/%d'

解决方法

您的问题是您试图将函数应用于列,而不是列中的值。

您已使用的代码:spark.udf.register("jalali_to_gregorian",jalali_to_gregorian,StringType())注册要在Spark SQL中使用的函数(通过spark.sql(...),而不是pyspark。

要获取可以在withColumnselect等内部使用的函数,您需要创建一个由udf函数完成的包装函数,并且该函数应在withColumn

from pyspark.sql.functions import udf
jalali_to_gregorian_udf = udf(jalali_to_gregorian,StringType())
df = df.withColumn("gre",jalali_to_gregorian_udf(df.jalali))
>>> df.show()
+----------+----------+
|    jalali|       gre|
+----------+----------+
|1399/01/02|2020/03/21|
|1399/01/01|2020/03/20|
+----------+----------+

有关更多详细信息,请参见documentation

您还存在时间格式错误-而不是format = "%Y/%m/d",它应该是format = "%Y/%m/%d"

P.S。如果您在Spark 3.x上运行,则建议使用vectorized UDFs (aka,Pandas UDFs)-它们比通常的UDF快得多,并且如果您有大量数据,将提供更好的性能。