问题描述
输入数据
c.OperationFilter<ModelInBodyOperationFilter>();
Curl
是 find -i-n *.* "css">>Results.txt
的 +-------+------+-----+----------+--------+------+-------------------+
|KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES |
+-------+------+-----+----------+--------+------+-------------------+
|0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286,796]]|
|0000001|6KBBCY|AA |2043826286|51183688|......|[[2043826286,799]]|
|0000001|6KBBCY|AA |1999999999|51183688|......|[[1999999999,700]]|
|0000002|777777|XX |1611826555|51183799|......|[[1611826555,500]]|
+-------+------+-----+----------+--------+------+-------------------+
:
IMAGES
我想合并表中的数据如下:
对于每个复合键 Seq()
,合并 Image
列并动态计算 case class Image ( EPOCH: String,USE_CASE : String )
列,作为从合并图像中提取的最小值。同一组合键的 <KEY_1,KEY_2,KEY_3>
列具有相同的值。上面的数据集将变成:
预期数据
IMAGE
我已经设法合并每个复合键的图像:
EPOCH
中间结果
DATA
现在我对下一步如何使用 Spark 的并行化优势有些困惑。我可以使用顺序逻辑:
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286,796],[2043826286,799],[1999999999,500]] |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
但这是Java的思维方式..在Spark中是否有更好的方法来做到这一点?
解决方法
这是获取问题中第二个表的方法:
import org.apache.spark.sql.functions._
val mergedImages = inputRecords.groupBy(
$"KEY_1",$"KEY_2",$"KEY_3"
).agg(
min($"EPOCH").as("EPOCH"),(inputRecords.columns.filterNot(
Seq("EPOCH","IMAGES","KEY_1","KEY_2","KEY_3").contains(_)
).map(
x => first(col(x)).as(x)
) :+ collect_list($"IMAGES"(0)).as("IMAGES")): _*
)
聚合可以用纯代码编写为:
.agg(
min($"EPOCH").as("EPOCH"),first($"DATA_1").as("DATA_1"),first($"DATA_2").as("DATA_2"),...
first($"DATA_N").as("DATA_N"),collect_list($"IMAGES"(0)).as("IMAGES")
)