Spark 数据框填充

问题描述

我想对数据框执行“填充”类型的操作,以删除空值并确保最后一行是一种汇总行,其中包含基于 {{1} 的每列的最后已知值},按 timestamp 分组。当我使用 Azure Synapse Notebooks 时,语言可以是 Scala、Pyspark、Sparksql 甚至 c#。然而这里的问题是真正的解决方案有多达数百万行和数百列,所以我需要一个可以利用 Spark 的动态解决方案。我们可以配置一个大集群,如何确保我们充分利用它?

示例数据:

itemId

第 4 行和第 7 行作为汇总行的预期结果:

// Assign sample data to dataframe
val df = Seq(
    ( 1,"10/01/2021",1,"abc",null ),( 2,"11/01/2021",null,"bbb" ),( 3,"12/01/2021","ccc",( 4,"13/01/2021","ddd" ),( 5,2,"eee","fff" ),( 6,( 7,null )
    ).
    toDF("eventId","timestamp","itemId","attrib1","attrib2")

df.show

我已经查看了此选项,但无法根据我的用例进行调整。

Spark / Scala: forward fill with last observation

我有一种有效的 Sparksql 解决方案,但对于大量列来说会非常冗长,希望有更容易维护的东西:

+-------+----------+------+-------+-------+
|eventId| timestamp|itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|      1|10/01/2021|     1|    abc|   null|
|      2|11/01/2021|     1|    abc|    bbb|
|      3|12/01/2021|     1|    ccc|    bbb|
|      4|13/01/2021|     1|    ccc|    ddd|
|      5|10/01/2021|     2|    eee|    fff|
|      6|11/01/2021|     2|    eee|    fff|
|      7|12/01/2021|     2|    eee|    fff|
+-------+----------+------+-------+-------+

解决方法

对于许多 columns,您可以创建一个 expression,如下所示

val window = Window.partitionBy($"itemId").orderBy($"timestamp")

// Instead of selecting columns you could create a list of columns 
val expr = df.columns
  .map(c => coalesce(col(c),last(col(c),true).over(window)).as(c))

df.select(expr: _*).show(false)

更新:

val mainColumns = df.columns.filterNot(_.startsWith("attrib"))
val aggColumns = df.columns.diff(mainColumns).map(c => coalesce(col(c),true).over(window)).as(c))

df.select(( mainColumns.map(col) ++ aggColumns): _*).show(false)

结果:

+-------+----------+------+-------+-------+
|eventId|timestamp |itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|1      |10/01/2021|1     |abc    |null   |
|2      |11/01/2021|1     |abc    |bbb    |
|3      |12/01/2021|1     |ccc    |bbb    |
|4      |13/01/2021|1     |ccc    |ddd    |
|5      |10/01/2021|2     |eee    |fff    |
|6      |11/01/2021|2     |eee    |fff    |
|7      |12/01/2021|2     |eee    |fff    |
+-------+----------+------+-------+-------+