无法使用 PySpark 在地图缩减功能中增加累加器

问题描述

这是我试图实现的 mapReduce 算法的摘录,我需要在我的 reducefunctioniterate 内增加累加器计数器,但我一直无法做到。应该怎么做?谢谢

counter = spark.sparkContext.accumulator(0)

def iterate1(pairs):
  counter.value = 0
  double_pairs = pairs.flatMap(lambda pair: ((pair[0],pair[1]),(pair[1],pair[0])))
  adj = double_pairs.groupByKey().map(lambda x: (x[0],list(x[1])))
  red = adj.flatMap(lambda pair: reducefunctioniterate(pair))
  return red

def reducefunctioniterate(pair):
  key,values = pair
  valuelist = []
  for_output = []
  mini = key
  for adj_node in values:
    if adj_node < mini:
      mini = adj_node
    valuelist.append(adj_node)
  if mini < key:
    for_output.append([key,mini])
    for val in valuelist:
      if mini != val:
        counter.add(1)
        for_output.append([val,mini])
  return for_output

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)