问题描述
我正在尝试与这个 List[Option[Map[String,DataFrame]]] 交互,但我遇到了一些麻烦。
里面有这样的东西:
customer1 -> dataframeX
customer2 -> dataframeY
customer3 -> dataframeZ
其中客户是将成为新列的标识符。
我需要对 dataframeX、dataframeY 和 dataframeZ 进行联合(所有 df 都具有相同的列)。在我有这个之前:
map(_.get).reduce(_ union _).select(columns:_*)
它运行良好,因为我只有一个 List[Option[DataFrame]] 并且不需要标识符,但我在使用新列表时遇到了问题。我的想法是修改我的旧映射,我知道我可以做“(0).get”之类的事情,这会给我带来“Map(customer1 -> dataframeX)”,但我不太确定如何在映射并获得最终的数据帧,它是所有三个加上标识符的并集。我的想法:
map(/*get identifier here along with dataframe*/).reduce(_ union _).select(identifier +: columns:_*)
最终结果将类似于:
-------------------------------
|identifier | product |State |
-------------------------------
| customer1| prod1 | VA |
| customer1| prod132 | VA |
| customer2| prod32 | CA |
| customer2| prod51 | CA |
| customer2| prod21 | AL |
| customer2| prod52 | AL |
-------------------------------
解决方法
您可以使用 collect
将 Option[Map[String,Dataframe]]
取消嵌套到 Map[String,DataFrame]
。要将标识符放入列中,您应该使用 withColumn
。所以你的代码可能看起来像:
import org.apache.spark.sql.functions.lit
val result: DataFrame = frames.collect {
case Some(m) =>
m.map {
case (identifier,dataframe) => dataframe.withColumn("identifier",lit(identifier))
}.reduce(_ union _)
}.reduce(_ union _)
,
也许是这样的?
list
.flatten
.flatMap {
_.map { case (id,df) =>
df.withColumn("identifier",id) }
}.reduce(_ union _)