问题描述
这是我试图实现的 mapReduce 算法的摘录,我需要在我的 reducefunctioniterate 内增加累加器计数器,但我一直无法做到。应该怎么做?谢谢
counter = spark.sparkContext.accumulator(0)
def iterate1(pairs):
counter.value = 0
double_pairs = pairs.flatMap(lambda pair: ((pair[0],pair[1]),(pair[1],pair[0])))
adj = double_pairs.groupByKey().map(lambda x: (x[0],list(x[1])))
red = adj.flatMap(lambda pair: reducefunctioniterate(pair))
return red
def reducefunctioniterate(pair):
key,values = pair
valuelist = []
for_output = []
mini = key
for adj_node in values:
if adj_node < mini:
mini = adj_node
valuelist.append(adj_node)
if mini < key:
for_output.append([key,mini])
for val in valuelist:
if mini != val:
counter.add(1)
for_output.append([val,mini])
return for_output
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)