Apache Beam 中具有键值状态的状态处理

问题描述

我正在尝试使用 Apache Beam 实现一个有状态的进程。我已经阅读了 Kenneth KNowles 的两篇文章Stateful processing with Apache BeamTimely (and Stateful) Processing with Apache Beam),但是我没有找到解决我的问题的方法。我使用的是 Python SDK。

特别是,我试图拥有一个包含键值对象的有状态 DoFn,我需要添加新元素,有时还需要删除一些元素。

我在 DoFn 类中看到 a solution may be to use a SetStateSpecTuple coder。问题是 SetSpaceSpec 没有类似“pop”的功能选项。在我看来,删除元素的唯一方法是使用 .clear() 将它们全部删除。 看起来您不能仅指定要使用此功能擦除的元素。

克服这个问题的一个机会可能是在我需要删除状态中的元素时清除和重写状态,但这对我来说看起来效率很低。

你知道如何有效地做到这一点吗?

Python 版本 3.8.7
阿帕奇光束==2.29.0

解决方法

我遵循了@TudorPlugaru 的建议,并提出了这个建议。希望对其他人有用。

import json
from apache_beam.coders import Coder

class MyDictCoder(Coder):
    """ My custom dictionary coders """
    def encode(self,o):
        return json.dumps(o).encode()

    def decode(self,o):
        return json.loads(o.decode())

    def is_deterministic(self) -> bool:
        return True

在 DoFn 声明中

from apache_beam.transforms.userstate import ReadModifyWriteStateSpec

class MyDoFn(beam.DoFn):
    DICTSTATE= ReadModifyWriteStateSpec(name='dictstate',coder=MyDictCoder())
    
    def process(self,element,DictState=beam.DoFn.StateParam(DICTSTATE)):
        # Do something
        yield DictState

并在管道内添加这一行(如 Beam example 中所做的那样)

beam.coders.registry.register_coder(typing.Dict,MyDictCoder)