侧面输入数据更新得很晚-Python Apache Beam

问题描述

我正在建立一个包含动态配置数据的管道,该数据会在触发时更新。

PubSub主题有5个,一个主题是IoT数据,另外4个主题是用于转换IoT数据的配置。

配置保留在Cloud Firestore文档中。每个文档更新后,Cloud Function都会读取并发送4个PubSub订阅的更新配置1。

配置更新后,新的辅助输入数据将被混入,并不能真正被新的辅助输入数据代替。 我已经等待了至少30分钟,仍然有旧的输入。 我正在使用 Dataflow Runner V2

p = beam.Pipeline(options=options)

class Transform(beam.DoFn):
    def process(self,configuration):
        ...
        yield output

def run():
    ...
    iot_data = (p
        | 'ReadiotData' >> ReadFrompubSub(SUBSCRIPTION_IOT)

    config_1 = (p
        | 'ReadConfig' >> ReadFrompubSub(SUBSCRIPTION_1)
        | 'WindowUserData' >> beam.WindowInto(
            window.GlobalWindows(),trigger=trigger.Repeatedly(trigger.AfterCount(1)),accumulation_mode=trigger.AccumulationMode.disCARDING)
        | 'LoadsUserData' >> beam.Map(lambda x: ('data',x.decode().replace('\\','')))

    config_2 = # same as config_1 with different PubSub subscription
    config_3 = # same as config_1 with different PubSub subscription
    config_4 = # same as config_1 with different PubSub subscription

    output = (iot_data
        | 'transform' >> beam.ParDo(Transform(),pvalue.AsDict(config_1),pvalue.AsDict(config_2),pvalue.AsDict(config_3),pvalue.AsDict(config_4))
        | 'Output' >> WritetoPubSub(TOPIC_C)

更新

添加更多的工人可以帮助方面输入更新性能。与添加的工人的比例相比,它要快一些。但仍然不在我想要的地方。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)