Apache 光束中的状态类

问题描述

我正在接触 Dataflow 和 Apache Beam 世界,我对有状态处理和一些类的实现有一些疑问。

我特别想知道 SetStateSpecBagStateSpec 之间有什么区别。 确实,在 https://datacraftacademy.com/apache-beam-python-stateful-processing/ 中,它们看起来是一回事,但我对此仍然不确定。

此外,我正在寻找这些 classes 的源代码,但我无法检索“beam_runner_api_pb2”指向的位置以及它的实现位置。

此外,我对 AccumulatingRuntimeState 以及从它继承的所有类(BagRuntimeState、SetRuntimeState、CombiningValueRuntimeState)的了解不清楚。这些类是不是在 python SDK 中实现的,还是我遗漏了什么?最终,这些类与非运行时状态(例如 SetStateSpec)之间有什么区别,它们用于哪些不同的用例?

任何帮助将不胜感激。 谢谢!

解决方法

运行时类与非运行时类

SetStateSpecBagStateSpec 是非运行时规范,用于让 SDK 了解 Beam 保留的状态。

运行时类(例如 SetRuntimeState 等)是为状态处理程序实现运行时行为的类。

来自博客文章中的示例:

class MyDoFn(DoFn):
    # This is a non-runtime spec. It can be BagStateSpec,SetStateSpec,etc.
    INDEX_STATE = ReadModifyWriteStateSpec(
        name='index',coder=VarIntCoder())

    def process(self,element,index=beam.DoFn.StateParam(INDEX_STATE)):
        # When we call `process`,the `index` variable actually holds a
        # runtime state object.
        assert isinstance(index,ReadModifyWriteRuntimeState)  # This assert works 
fine
        current_index = index.read() or 0
        yield current_index,element
        index.write(current_index + 1)

SetStateSpec vs BagStateSpec

对于 SetStateSpecBagStateSpec,它们实现了类似的状态,但是 BagStateSpec 定义了类似于列表的东西,而 SetStateSpec 定义了类似集合的东西。这意味着 BagStateSpec 可以容纳多个相等的元素,而 SetStateSpec 只能容纳任何给定对象的一个​​实例。

关于 AccumulatingRuntimeState 及其类

您找到的代码是 Beam 正在使用的运行器要实现的接口。例如,you can find the Direct runner implementation here