问题描述
我有这样的数据:
[('a',110),('a',130),120),('b',200),206)]
我想对键进行分组并对值执行计数、平均值、最小值和最大值以获得以下结果:
[('a',3,120,110,2,203,200,206)]
我大致知道如何使用 countByKey() 和 reduceByKey() 自己完成每个聚合,但我不确定如何将它们全部包含在一个 RDD 中。有什么帮助吗?
编辑:这是我真实rdd的片段
Out[16]: [('Alaska Airlines Inc.',17.0),('Alaska Airlines Inc.',63.0),70.0),16.0),('United Airlines',9.0),197.0),115.0),6.0),1.0),
解决方法
好吧,我设法通过使用 aggregateByKey
函数和 map
返回所需的“架构”来获得您的解决方案:
data = sc.parallelize([('a',110),('a',120),130),('b',200),206)])
def sequence_operator(accumulator,element):
return (accumulator[0] + 1,accumulator[1] + element,min(accumulator[2],element),max(accumulator[3],element))
def combination_operator(current_accumulator,next_accumulator):
return (current_accumulator[0] + next_accumulator[0],current_accumulator[1] + next_accumulator[1],min(current_accumulator[2],next_accumulator[2]),max(current_accumulator[3],next_accumulator[3]))
def unpack_aggregations(data):
key = data[0]
count,total,minimum,maximum = data[1]
return key,count,total / count,maximum
aggregations = data.aggregateByKey(zeroValue=(0,float('inf'),float('-inf')),seqFunc=sequence_operator,combFunc=combination_operator)
mapped_data = aggregations.map(unpack_aggregations)
print(mapped_data.collect())
输出
[('a',3,120.0,110,2,203.0,200,206)]