问题描述
nominals
是我的参考数据集,其中包含每天应在特定计算机上测量的所有点。它具有以下简化结构
nominals
+------------+----+----+-----+----+
| point_name | x| y| z|side|
+------------+----+----+-----+----+
|str1 |. |. |. | . |
|str2 |. |. |. | . |
|str3 |. |. |. | . |
+------------+----+----+-----+----+
x,y,z
是标称点坐标。 point_name
是所测点的唯一标识符,并且包含side
。因此,每一侧都有要测量的所需点的列表,例如用伪代码point_name[point_name.side=="L"].unique()
。
用于存储实际测量值的数据集具有类似的结构,但另外一列id
标识了要测量的机器。每个id
的左侧和右侧都会被测量。
actuals
+------------+----+----+-----+----+----+
| point_name | x| y| z|side| id |
+------------+----+----+-----+----+----+
|str4 |. |. |. | . |. |
|str5 |. |. |. | . |. |
|str6 |. |. |. | . |. |
+------------+----+----+-----+----+----+
现在,我想创建一个带有附加列的新数据集,其中列出了每对id-side
对测量数据集中缺少的所有点。换句话说,对于每个id-side
组合,我都需要检查实际测量中存在的point_name
列表,再一次进行名义测量,并找出两者之间的差异。预期结果将是
+-----------------------+-----------+
| id | missing_L | missing_R |
+-----------------------+-----------+
|9433| point_1,point_12| point_14 |
|9512| null | point_15 |
+-----------------------+-----------+
在当前解决方案中,我将围绕point_name
进行检查,并检查具有null
值的列。
nominals_left = (
nominals
.filter(nominals.side == "L")
.select("point_name")
)
nominals_left_list = nominals_left.rdd.flatMap(lambda x: x).collect()
actuals_left = (
actuals
.filter(actuals.side == "L")
.groupBy("id")
.pivot("point_name",values=nominals_left_list)
.sum("x")
)
actuals_left_final = (
actuals_left
.withColumn("missing_L",F.array(*[F.when(F.isnull(c),F.lit(c)) for c in actuals_left.columns]))
.withColumn("missing_L",F.expr("array_join(missing_L,',')"))
.select("id","missing_L")
)
有没有更简单或更有效的方法?同样,似乎我必须对每个side
进行相同的检查,以避免由于属于另一侧的点而导致创建假肯定的null。最好在id
和side
上一起分组。
解决方法
我认为经过研究后我找到了一种方法。我只是简单地从名义表中创建了所有必需点的列表,将其与side
上的测量表结合在一起,然后对这两个数组列进行了区别。似乎工作正常。
这是给定nominals
表和actuals
表的示例。
首先创建每个id
和side
nominal_list = (
nominals
.groupby("side")
.agg(F.collect_list(F.col("point_name")))
.withColumnRenamed("collect_list(point_name)","nominal_points")
)
然后将其添加到使用额外的id
列通过测量创建的类似数据集上(请注意调用F.array_except
时数组的顺序)。
missing = (
actuals
.groupby(["id","side"])
.agg(F.collect_list(F.col("point_name")))
.withColumnRenamed("collect_list(point_name)","measured_points")
.join(nominal_list,"side")
.withColumn('missing',F.array_except('nominal_points','measured_points'))
)
请注意,这与我一开始所要求的稍有不同,因为我将一面作为附加列,并且未隐藏在列名(即missing_L
和missing_R
中)。