问题描述
我想使用累加器来计算我的 RDD 中对象的几个参数的组合。
例如,我的 RDD 为 Obj
,字段为 a
和 b
。这两个字段都是枚举,可能具有少数值之一。
为了实现它,我应该在驱动程序上创建累加器并在工人上使用它:
val acc1 = sc.longAccumulator("a1-b1")
val acc2 = sc.longAccumulator("a2-b1")
val acc3 = sc.longAccumulator("a1-b2")
...
我不想在所有具有相同逻辑的火花作业中为所有值组合声明大量计数器。 是否有任何机制允许在执行器上动态创建累加器或以其他方式解决此问题?
我搜索类似的东西:
rdd.foreach{ getAccumulator("${obj.a} - ${obj.b}").add(1) }
解决方法
从字面上回答你的问题,你不能在执行器上动态注册新的累加器。在作业实际开始之前,必须在驱动程序 (sparkContext.accumulator()
) 上计划累加器。这就是 Spark 中累加器的设计方式。
但考虑到您实际想要实现的内容,您可能会得出结论,只需一个“静态”累加器即可实现相同的功能,该累加器收集 Map<String,Long>
条目而不是 Long
。
This 博客文章可能会更实际地理解我在这里的意思。