GCP Dataflow + Apache Beam - 缓存问题

问题描述

总的来说,我是 GCP、Dataflow、Apache Beam、Python 和 OOP 的新手。我来自函数式 javascript 领域,用于上下文。

现在我有一个使用 Apache Beam python sdk 构建的流管道,我将它部署到 GCP 的 Dataflow。管道的源是发布订阅,接收器是数据存储。

管道从 pubsub 订阅获取消息,根据配置对象 + 消息内容做出决定,然后根据它做出的决定将其放在数据存储中的适当位置。目前这一切都在起作用。

现在我的情况是,当前硬编码的配置对象需要更加动态。我的意思是:我们现在不只是硬编码配置对象,而是要进行 API 调用以返回配置。这样,我们就可以更新配置而无需重新部署管道。这也适用于目前。

但是!我们预计流量会很大,因此获取传入的每条消息的配置并不理想。因此,我们将获取移到开头,就在实际管道开始之前。但这意味着我们立即失去了让它来自 API 调用的价值,因为 API 调用只在管道启动时发生一次。

这是我们目前所拥有的(为了清楚起见,去掉了不相关的部分):

def run(argv=None):

    options = PipelineOptions(
        streaming=True,save_main_session=True
    )

    configuration = get_configuration() # api call to fetch config

    with beam.Pipeline(options=options) as pipeline:

        # read incoming messages from pubsub
        incoming_messages = (
            pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFrompubSub(subscription=f"our subscription here",with_attributes=True))

         # make a decision based off of the message + the config
         decision_messages = (
                incoming_messages
                | "Create Decision Messages" >> beam.FlatMap(create_decision_message,configuration)
        )

create_decision_message 接收来自流的传入消息 + 配置文件,然后您猜对了,然后做出决定。这是非常简单的逻辑。想想“如果消息是苹果,并且配置说我们只关心橙子,那么就对消息不做任何处理”。我们需要能够即时更新它以说“没关系,我们现在突然也关心苹果了”。

我需要想办法让管道知道它需要每 15 分钟重新获取配置文件。我不完全确定使用我正在使用的工具做到这一点的最佳方法是什么。如果是 javascript,我会这样做:

(请原谅伪代码,不确定这是否会实际运行,但您明白了)


let fetch_time = Date.Now()  // initialized when app starts
let expiration = 900 // 900 seconds = 15 mins 
let config = getConfigFromApi() // fetch config right when app starts

function fetchConfig(Now){
    if (fetch_time + expiration < Now) { 
      // if fetch_time + expiration is less than the current time,we need to re-fetch the config
      config = getConfigFromApi() // assign new value to config var
      fetch_time = Now // assign new value to fetch_time var
  } 
    return config 
}

...

const someLaterTime = Date.Now() // later in the code,within the pipeline,I need to use the config object
const validConfig = fetchConfig(someLaterTime) // i pass in the current time and get back either the memory-cached config,or a just-recently-fetched config

我不确定如何将这个概念转化为 Python,我也不确定是否应该这样做。这是一个尝试实现的合理的事情吗?还是这种类型的行为与我使用的堆栈不一致?我所在的位置是我团队中唯一一个从事此工作的人,而且这是一个新建项目,因此没有任何关于过去如何完成的示例。我不确定我是否应该尝试解决这个问题,或者我是否应该说“对不起老板,我们需要另一个解决方案”。

感谢任何帮助,无论多么小......谢谢!

解决方法

我认为有多种方法可以实现您想要实现的目标,最直接的方法可能是通过有状态处理,在有状态 DoFn 中通过状态记录您的配置并设置循环计时器以刷新记录。

您可以在此处阅读有关有状态处理的更多信息https://beam.apache.org/blog/timely-processing/

有关状态和计时器的更多信息来自光束编程指南:https://beam.apache.org/documentation/programming-guide/#types-of-state

我认为您可以定义需要 ParDo 中的配置的处理逻辑,例如:

class MakeDecision(beam.DoFn):
  CONFIG = ReadModifyWriteState('config',coders.StrUtf8Coder())
  REFRESH_TIMER = TimerSpec('output',TimeDomain.REAL_TIME)

  def process(self,element,config=DoFn.StateParam(CONFIG),timer=DoFn.TimerParam(REFRESH_TIMER)):
    valid_config={}
    if config.read():
      valid_config=json.loads(config.read())
    else: # config is None and hasn't been fetched before.
      valid_config=fetch_config() # your own fetch function.
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))
    # Do what ever you need with the config.
    ...
  
  @on_timer(REFRESH_TIMER)
  def refresh_config(self,timer=DoFn.TimerParam(REFRESH_TIMER)):
      valid_config=fetch_config()
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))

然后您现在可以使用 Stateful DoFn 处理您的消息。

with beam.Pipeline(options=options) as pipeline:
    pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here",with_attributes=True))
            | "Make decision" >> beam.ParDo(MakeDecision())