问题描述
我开始将我的 Pandas 实现转换为 pySpark,但是我在完成一些基本操作时遇到了麻烦。所以我有这张桌子:
+-----+-----+----+
| Col1|Col2 |Col3|
+-----+-----+----+
| 1 |[1,3]| 0|
| 44 |[2,0]| 1|
| 77 |[1,5]| 7|
+-----+-----+----+
我想要的输出是:
+-----+-----+----+----+
| Col1|Col2 |Col3|Col4|
+-----+-----+----+----+
| 1 |[1,3]| 0|2.67|
| 44 |[2,0]| 1|2.67|
| 77 |[1,5]| 7|2.67|
+-----+-----+----+----+
到达这里:
- 我对 Col2 中每个数组的第一项求平均值,并对 Col2 中每个数组的第二项求平均值。由于第二个“子列”的平均值((3+0+5)/3)比第一个“子列”((1+2+1)/3)大,这是“获胜”条件.之后,我创建了一个新列,该列将“获胜”平均值复制到该表的行数(在本例中为 3)。 我已经能够通过“手动”选择 ta 列,对其进行平均,然后使用“点亮”来复制结果来做到这一点。我的实现的问题是 collect() 需要很多时间并且不推荐它。 你能帮我解决这个问题吗?
解决方法
您可以使用 greatest
获得数组中每个(子)列的最大平均值:
from pyspark.sql import functions as F,Window
df2 = df.withColumn(
'Col4',F.greatest(*[F.avg(F.udf(lambda r: [float(i) for i in r.toArray()],'array<double>')('Col2')[i]).over(Window.orderBy()) for i in range(2)])
)
df2.show()
+----+------+----+------------------+
|Col1| Col2|Col3| Col4|
+----+------+----+------------------+
| 1|[1,3]| 0|2.6666666666666665|
| 44|[2,0]| 1|2.6666666666666665|
| 77|[1,5]| 7|2.6666666666666665|
+----+------+----+------------------+
如果你希望数组大小是动态的,你可以这样做
arr_size = df.select(F.max(F.size(F.udf(lambda r: [float(i) for i in r.toArray()],'array<double>')('Col2')))).head()[0]
df2 = df.withColumn(
'Col4','array<double>')('Col2')[i]).over(Window.orderBy()) for i in range(arr_size)])
)