问题描述
我正在尝试在Scala中编写一个处理空值的用户定义函数(UDF)。对于我的示例,如果值不为null,则尝试返回列的纪元。我发现Option []用于从udf返回null。
这是我的UDF:
def to_epoch(date: Timestamp) : Option[Long] = {
if(date != null) {
Option.apply(date.getTime)
} else {
Option.empty
}
}
val toEpoch: (Timestamp => Option[Long]) => UserDefinedFunction = udf((_: Timestamp => Option[Long]))
我正在从如下读取的文件中创建一个数据框,并且我想添加列“ dateEpoch”。我不知道如何处理udf返回的Option[Long]
:
spark.read
.schema(ListeningStatsSchema.schema)
.json(location)
.withColumn("dateEpoch",toEpoch(col("EventTS"))
我得到的错误是:
type mismatch;
found : org.apache.spark.sql.Column
required: java.sql.Timestamp => Option[Long]
.withColumn("opd",toEpoch(col("event_TS")))
解决方法
您收到的错误意味着您定义的函数需要一个Timestamp
(参见REPL提供的类型)。但是,您提供的是Column
,因此会出现错误。 Column
类型是您使用Spark SQL操作的主要类型。您可以使用预定义的函数和运算符(例如,可以用+添加列)或UDF,但不能使用常规的scala函数。
要修复代码,您需要使用udf
函数将函数转换为Spark UDF。您可以这样做:
val to_epoch_udf = udf(to_epoch _)
// And we can try it:
spark.range(1).select(to_epoch_udf(current_timestamp)).show
给出:
+------------------------+
|UDF(current_timestamp())|
+------------------------+
|1599492185730 |
+------------------------+