问题描述
总的来说,我是 GCP、Dataflow、Apache Beam、Python 和 OOP 的新手。我来自函数式 javascript 领域,用于上下文。
现在我有一个使用 Apache Beam python sdk 构建的流管道,我将它部署到 GCP 的 Dataflow。管道的源是发布订阅,接收器是数据存储。
管道从 pubsub 订阅中获取消息,根据配置对象 + 消息内容做出决定,然后根据它做出的决定将其放在数据存储中的适当位置。目前这一切都在起作用。
现在我的情况是,当前硬编码的配置对象需要更加动态。我的意思是:我们现在不只是硬编码配置对象,而是要进行 API 调用以返回配置。这样,我们就可以更新配置而无需重新部署管道。这也适用于目前。
但是!我们预计流量会很大,因此获取传入的每条消息的配置并不理想。因此,我们将获取移到开头,就在实际管道开始之前。但这意味着我们立即失去了让它来自 API 调用的价值,因为 API 调用只在管道启动时发生一次。
这是我们目前所拥有的(为了清楚起见,去掉了不相关的部分):
def run(argv=None):
options = PipelineOptions(
streaming=True,save_main_session=True
)
configuration = get_configuration() # api call to fetch config
with beam.Pipeline(options=options) as pipeline:
# read incoming messages from pubsub
incoming_messages = (
pipeline
| "Read Messages From PubSub"
>> beam.io.ReadFrompubSub(subscription=f"our subscription here",with_attributes=True))
# make a decision based off of the message + the config
decision_messages = (
incoming_messages
| "Create Decision Messages" >> beam.FlatMap(create_decision_message,configuration)
)
create_decision_message
接收来自流的传入消息 + 配置文件,然后您猜对了,然后做出决定。这是非常简单的逻辑。想想“如果消息是苹果,并且配置说我们只关心橙子,那么就对消息不做任何处理”。我们需要能够即时更新它以说“没关系,我们现在突然也关心苹果了”。
我需要想办法让管道知道它需要每 15 分钟重新获取该配置文件。我不完全确定使用我正在使用的工具做到这一点的最佳方法是什么。如果是 javascript,我会这样做:
(请原谅伪代码,不确定这是否会实际运行,但您明白了)
let fetch_time = Date.Now() // initialized when app starts
let expiration = 900 // 900 seconds = 15 mins
let config = getConfigFromApi() // fetch config right when app starts
function fetchConfig(Now){
if (fetch_time + expiration < Now) {
// if fetch_time + expiration is less than the current time,we need to re-fetch the config
config = getConfigFromApi() // assign new value to config var
fetch_time = Now // assign new value to fetch_time var
}
return config
}
...
const someLaterTime = Date.Now() // later in the code,within the pipeline,I need to use the config object
const validConfig = fetchConfig(someLaterTime) // i pass in the current time and get back either the memory-cached config,or a just-recently-fetched config
我不确定如何将这个概念转化为 Python,我也不确定是否应该这样做。这是一个尝试实现的合理的事情吗?还是这种类型的行为与我使用的堆栈不一致?我所在的位置是我团队中唯一一个从事此工作的人,而且这是一个新建项目,因此没有任何关于过去如何完成的示例。我不确定我是否应该尝试解决这个问题,或者我是否应该说“对不起老板,我们需要另一个解决方案”。
感谢任何帮助,无论多么小......谢谢!
解决方法
我认为有多种方法可以实现您想要实现的目标,最直接的方法可能是通过有状态处理,在有状态 DoFn 中通过状态记录您的配置并设置循环计时器以刷新记录。
您可以在此处阅读有关有状态处理的更多信息https://beam.apache.org/blog/timely-processing/
有关状态和计时器的更多信息来自光束编程指南:https://beam.apache.org/documentation/programming-guide/#types-of-state。
我认为您可以定义需要 ParDo 中的配置的处理逻辑,例如:
class MakeDecision(beam.DoFn):
CONFIG = ReadModifyWriteState('config',coders.StrUtf8Coder())
REFRESH_TIMER = TimerSpec('output',TimeDomain.REAL_TIME)
def process(self,element,config=DoFn.StateParam(CONFIG),timer=DoFn.TimerParam(REFRESH_TIMER)):
valid_config={}
if config.read():
valid_config=json.loads(config.read())
else: # config is None and hasn't been fetched before.
valid_config=fetch_config() # your own fetch function.
config.write(json.dumps(valid_config))
timer.set(Timestamp.now() + Duration(seconds=900))
# Do what ever you need with the config.
...
@on_timer(REFRESH_TIMER)
def refresh_config(self,timer=DoFn.TimerParam(REFRESH_TIMER)):
valid_config=fetch_config()
config.write(json.dumps(valid_config))
timer.set(Timestamp.now() + Duration(seconds=900))
然后您现在可以使用 Stateful DoFn 处理您的消息。
with beam.Pipeline(options=options) as pipeline:
pipeline
| "Read Messages From PubSub"
>> beam.io.ReadFromPubSub(subscription=f"our subscription here",with_attributes=True))
| "Make decision" >> beam.ParDo(MakeDecision())