为什么在使用返回类型为Option [Long]的对象的UDF时出现类型不匹配错误?

问题描述

我正在尝试在Scala中编写一个处理空值的用户定义函数(UDF)。对于我的示例,如果值不为null,则尝试返回列的纪元。我发现Option []用于从udf返回null。

这是我的UDF:

def to_epoch(date: Timestamp) : Option[Long] = {
    if(date != null) {
        Option.apply(date.getTime)
    } else {
        Option.empty
    }
}

val toEpoch: (Timestamp => Option[Long]) => UserDefinedFunction = udf((_: Timestamp => Option[Long]))

我正在从如下读取的文件中创建一个数据框,并且我想添加列“ dateEpoch”。我不知道如何处理udf返回的Option[Long]

spark.read
     .schema(ListeningStatsSchema.schema)
     .json(location)
     .withColumn("dateEpoch",toEpoch(col("EventTS"))

我得到的错误是:

type mismatch;
 found   : org.apache.spark.sql.Column
 required: java.sql.Timestamp => Option[Long]
            .withColumn("opd",toEpoch(col("event_TS")))

解决方法

您收到的错误意味着您定义的函数需要一个Timestamp(参见REPL提供的类型)。但是,您提供的是Column,因此会出现错误。 Column类型是您使用Spark SQL操作的主要类型。您可以使用预定义的函数和运算符(例如,可以用+添加列)或UDF,但不能使用常规的scala函数。

要修复代码,您需要使用udf函数将函数转换为Spark UDF。您可以这样做:

val to_epoch_udf = udf(to_epoch _)

// And we can try it:
spark.range(1).select(to_epoch_udf(current_timestamp)).show

给出:

+------------------------+
|UDF(current_timestamp())|
+------------------------+
|1599492185730           |
+------------------------+