spark中的迭代过滤器似乎不起作用

问题描述

我正在尝试一个一个删除 RDD 的元素,但这不起作用,因为元素重新出现。

这是我的代码的一部分:

rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0,3]

所以似乎只有最后一个过滤器是“记住”。我在想,在这个循环之后,rdd 会是空的。

然而,我不明白为什么,因为每次我将过滤器获得的新rdd保存在“rdd”中,所以它不应该保留所有转换吗?如果没有,我该怎么办?

谢谢你指出我哪里错了!

解决方法

结果实际上是正确的 - 这不是 Spark 的错误。注意 lambda 函数定义为 x != ii 没有代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像

rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)

由于过滤器都是一样的,都会用最新的i代替,所以每次for循环只过滤掉一项。

为避免这种情况,您可以使用偏函数来确保将 i 替换到函数中:

from functools import partial
 
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd = rdd.filter(partial(lambda x,i: x != i,i))

print(rdd.collect())

或者您可以使用reduce

from functools import reduce

rdd = spark.sparkContext.parallelize([0,2])
rdd = reduce(lambda r,i: r.filter(lambda x: x != i),range(3),rdd)
print(rdd.collect())