问题描述
我正在尝试一个一个地删除 RDD 的元素,但这不起作用,因为元素重新出现。
这是我的代码的一部分:
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0,3]
所以似乎只有最后一个过滤器是“记住”。我在想,在这个循环之后,rdd 会是空的。
然而,我不明白为什么,因为每次我将过滤器获得的新rdd保存在“rdd”中,所以它不应该保留所有转换吗?如果没有,我该怎么办?
谢谢你指出我哪里错了!
解决方法
结果实际上是正确的 - 这不是 Spark 的错误。注意 lambda 函数定义为 x != i
,i
没有代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像
rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)
等
由于过滤器都是一样的,都会用最新的i
代替,所以每次for循环只过滤掉一项。
为避免这种情况,您可以使用偏函数来确保将 i
替换到函数中:
from functools import partial
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd = rdd.filter(partial(lambda x,i: x != i,i))
print(rdd.collect())
或者您可以使用reduce
:
from functools import reduce
rdd = spark.sparkContext.parallelize([0,2])
rdd = reduce(lambda r,i: r.filter(lambda x: x != i),range(3),rdd)
print(rdd.collect())