派斯帕克;对列表值使用 ReduceByKey

问题描述

我试图更好地理解 reduceByKey 函数,并一直在探索使用它来完成不同任务的方法。我想应用下面显示的 RDD 数据。一行数据的格式是一个带有名称元组,然后是与该名称相关联的所有日期的列表(以下是数据外观的副本)

data = [("Cassavetes,Frank",['2012','2002','2009','2005']),("Knight,Shirley (I)",['1997','2009']),("Yip,Françoise",['2007','2004','2000']),("Danner,Blythe",['2000','2008','2012','2010','1999','1998']),("Buck (X)",['2002','2006','2009'])]

为了计算与元组中每个名称关联的所有日期的计数,我应用了下面的代码,使用 reduceByKey 函数尝试将日期列表转换为列表。

rdd = spark.sparkContext.parallelize(data)
reducedRdd = rdd.reduceByKey( lambda a,b: len(a.split(" ")) + len(b.split(" ")) )
reducedRdd.take(1)

上面的代码产生与输入数据相同的结果,并且不执行 reduce 函数中列出的任何转换,下面是代码输出的示例:

[('Yip,Françoise','2000'])]

我期望的输出如下;

[("Yip,3)]

为什么我上面写的代码没有给我预期的输出,我将如何修改它以确保它可以?

解决方法

您正在寻找 map,而不是 reduceByKey。没有什么可以减少的,因为你的数据已经按key分组了,所以你的RDD什么也没做,你就找回了原来的RDD。

rdd2 = rdd.map(lambda x: (x[0],len(x[1])))

print(rdd2.collect())
# [('Cassavetes,Frank',4),('Knight,Shirley (I)',3),('Yip,Françoise',('Danner,Blythe',8),('Buck (X)',3)]

mapValues 可能更合适:

rdd2 = rdd.mapValues(len)

print(rdd2.collect())
# [('Cassavetes,3)]

如果您想使用 reduceByKey,您的数据应该取消分组。例如如果你有

data = [('Cassavetes,'2012'),('Cassavetes,'2002'),'2009'),'2005'),'1997'),'2007'),'2004'),'2000'),'2008'),'2010'),'1999'),'1998'),'2006'),'2009')]

那你就可以了

rdd = sc.parallelize(data)

from operator import add
rdd2 = rdd.map(lambda x: (x[0],1)).reduceByKey(add)

rdd2.collect()
# [('Yip,3)]