问题描述
{"name": "Ben","lastHolidayDestination": "Florida","holidays": [
{"destination": "Florida","year": 2020},{"destination": "Lille","year": 2019}
]}
我想使用Spark sql将新列lastHolidayYear
添加到数据集的根,方法是找到连接到holidays
的{{1}}元素(假设只会存在一)。因此输出数据集将是:
lastHolidayDestination
我一直在使用{"name": "Ben","lastHolidayYear": 2020,"year": 2019}
]}
和dataset.withColumn()
(使用Java,但是Scala / Python的答案很好),但到目前为止我还没有找到答案。除非必须,否则我真的不想使用UDF。有什么建议吗?
解决方法
要模拟与数组的连接,可以使用展平和过滤器组合:
val result = ds.withColumn("expl",explode(col("holidays")))
.filter("lastHolidayDestination = expl.destination")
.withColumn("lastHolidayYear",col("expl.year"))
.drop("expl")
,
从Spark 3.0开始,您可以首先过滤数组,然后使用以下表达式获取数组的第一个元素:
import org.apache.spark.sql.functions.{element_at,filter,col}
val extractElementExpr = element_at(filter(col("myArrayColumnName"),myCondition),1)
其中"myArrayColumnName"
是包含数组的列的名称,而myCondition
是条件,它是Column => Column
表达式。
对于您的特定示例,代码为:
import org.apache.spark.sql.functions.{col,element_at,filter}
import org.apache.spark.sql.Column
val isLastHoliday = (c: Column) => c.getField("destination") === col("lastHolidayDestination")
val getLastHoliday = element_at(filter(col("holidays"),isLastHoliday),1)
val result = df.withColumn("lastHolidayYear",getLastHoliday.getField("year"))
使用此代码,如果您的输入数据框包含以下值:
+------+----------------------+--------------------------------+
|name |lastHolidayDestination|holidays |
+------+----------------------+--------------------------------+
|Ben |Florida |[[Florida,2020],[Lille,2019]]|
|Alice |Peru |[[Florida,2019]]|
|Robert|Lille |[[Florida,2019]]|
+------+----------------------+--------------------------------+
输出将是:
+------+----------------------+--------------------------------+---------------+
|name |lastHolidayDestination|holidays |lastHolidayYear|
+------+----------------------+--------------------------------+---------------+
|Ben |Florida |[[Florida,2019]]|2020 |
|Alice |Peru |[[Florida,2019]]|null |
|Robert|Lille |[[Florida,2019]]|2019 |
+------+----------------------+--------------------------------+---------------+