合并数据集的行并在某些合并的列上应用自定义函数

问题描述

输入数据

c.OperationFilter<ModelInBodyOperationFilter>();

Curlfind -i-n *.* "css">>Results.txt +-------+------+-----+----------+--------+------+-------------------+ |KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES | +-------+------+-----+----------+--------+------+-------------------+ |0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286,796]]| |0000001|6KBBCY|AA |2043826286|51183688|......|[[2043826286,799]]| |0000001|6KBBCY|AA |1999999999|51183688|......|[[1999999999,700]]| |0000002|777777|XX |1611826555|51183799|......|[[1611826555,500]]| +-------+------+-----+----------+--------+------+-------------------+

IMAGES

我想合并表中的数据如下: 对于每个复合键 Seq(),合并 Image 列并动态计算 case class Image ( EPOCH: String,USE_CASE : String ) 列,作为从合并图像中提取的最小值。同一组合键的 <KEY_1,KEY_2,KEY_3> 列具有相同的值。上面的数据集将变成:

预期数据

IMAGE

我已经设法合并每个复合键的图像:

EPOCH

中间结果

DATA

现在我对下一步如何使用 Spark 的并行化优势有些困惑。我可以使用顺序逻辑:

+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|KEY_1  |KEY_2 |KEY_3|EPOCH     |DATA_1  |DATA_N|IMAGES                                                   |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|0000001|6KBBCY|AA   |1611826286|51183688|......|[[1611826286,796],[2043826286,799],[1999999999,500]]                                      |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+

但这是Java的思维方式..在Spark中是否有更好的方法来做到这一点?

解决方法

这是获取问题中第二个表的方法:

import org.apache.spark.sql.functions._

val mergedImages = inputRecords.groupBy(
    $"KEY_1",$"KEY_2",$"KEY_3"
).agg(
    min($"EPOCH").as("EPOCH"),(inputRecords.columns.filterNot(
        Seq("EPOCH","IMAGES","KEY_1","KEY_2","KEY_3").contains(_)
    ).map(
        x => first(col(x)).as(x)
    ) :+ collect_list($"IMAGES"(0)).as("IMAGES")): _*
)

聚合可以用纯代码编写为:

.agg(
    min($"EPOCH").as("EPOCH"),first($"DATA_1").as("DATA_1"),first($"DATA_2").as("DATA_2"),...
    first($"DATA_N").as("DATA_N"),collect_list($"IMAGES"(0)).as("IMAGES")
)