问题描述
我正在建立一个包含动态配置数据的管道,该数据会在触发时更新。
PubSub主题有5个,一个主题是IoT数据,另外4个主题是用于转换IoT数据的配置。
配置保留在Cloud Firestore文档中。每个文档更新后,Cloud Function都会读取并发送4个PubSub订阅的更新配置1。
配置更新后,新的辅助输入数据将被混入,并不能真正被新的辅助输入数据代替。 我已经等待了至少30分钟,仍然有旧的输入。 我正在使用 Dataflow Runner V2
p = beam.Pipeline(options=options)
class Transform(beam.DoFn):
def process(self,configuration):
...
yield output
def run():
...
iot_data = (p
| 'ReadiotData' >> ReadFrompubSub(SUBSCRIPTION_IOT)
config_1 = (p
| 'ReadConfig' >> ReadFrompubSub(SUBSCRIPTION_1)
| 'WindowUserData' >> beam.WindowInto(
window.GlobalWindows(),trigger=trigger.Repeatedly(trigger.AfterCount(1)),accumulation_mode=trigger.AccumulationMode.disCARDING)
| 'LoadsUserData' >> beam.Map(lambda x: ('data',x.decode().replace('\\','')))
config_2 = # same as config_1 with different PubSub subscription
config_3 = # same as config_1 with different PubSub subscription
config_4 = # same as config_1 with different PubSub subscription
output = (iot_data
| 'transform' >> beam.ParDo(Transform(),pvalue.AsDict(config_1),pvalue.AsDict(config_2),pvalue.AsDict(config_3),pvalue.AsDict(config_4))
| 'Output' >> WritetoPubSub(TOPIC_C)
更新
添加更多的工人可以帮助方面输入更新性能。与添加的工人的比例相比,它要快一些。但仍然不在我想要的地方。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)