侧面输入数据不会更新-Python Apache Beam

问题描述

我正在建立一个包含动态配置数据的管道,该管道会在每次触发时进行更新。

有2个PubSub主题主题A用于IoT数据,主题B用于将用于转换IoT数据的配置。

配置保留在Cloud Firestore中。更新数据库后,Cloud Function将读取更新的配置并将其发送到PubSub主题B。

问题在于,数据流作业仅在作业开始时读取配置数据,并且永远不会更新。

我该如何做才能使侧面输入得到更新?

p = beam.Pipeline(options=options)

class Transform(beam.DoFn):
    def process(self,configuration):
        ...
        yield output

def run():
    ...
    iot_data = (p
        | 'ReadiotData' >> ReadFrompubSub(TOPIC_A)

    configuration = (p
        | 'ReadConfig' >> ReadFrompubSub(TOPIC_B)
        | 'WindowUserData' >> beam.WindowInto(
            window.GlobalWindows(),trigger=trigger.Repeatedly(trigger.AfterCount(1)),accumulation_mode=trigger.AccumulationMode.disCARDING)
        | 'JsonLoadsUserData' >> beam.Map(lambda x: ('data',x.decode().replace('\\','')))

    output = (iot_data
        | 'transform' >> beam.ParDo(Transform(),beam.pvalue.AsDict(configuration))
        | 'Output' >> WritetoPubSub(TOPIC_C)

解决方法

我会尝试使用https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2运行此管道,该管道对侧面输入有更好的支持。