如何创建以当前日期为输入的梁模板每天更新[根据GET请求创建]

问题描述

我正在尝试创建一个每天使用Cloud Scheduler运行的Dataflow作业。我需要使用GET请求从外部API获取数据,因此需要输入当前日期。但是,当我将数据流作业导出为计划模板时,输入的日期保留在执行时,而不是每天更新。我一直在寻找解决方案,遇到了ValueProvider,但是我使用apache_beam.transforms.Create声明的管道始终返回错误'RuntimeValueProvider(option:test,type:str,default_value:'killme')未指定ValueProvider时,不会从运行时上下文中调用.get()。

反正我可以克服吗?这似乎是一个简单的问题,但是无论如何我都无法使它起作用。如果有任何想法,我将不胜感激!

解决方法

您可以使用ValueProvider接口将运行时参数传递到管道,在DoFn中访问它,您需要将其作为参数传递。与下面的示例类似:

https://beam.apache.org/documentation/patterns/pipeline-options/#retroactively-logging-runtime-parameters

class LogValueProvidersFn(beam.DoFn):
  def __init__(self,string_vp):
    self.string_vp = string_vp

  # Define the DoFn that logs the ValueProvider value.
  # The DoFn is called when creating the pipeline branch.
  # This example logs the ValueProvider value,but
  # you could store it by pushing it to an external database.
  def process(self,an_int):
    logging.info('The string_value is %s' % self.string_vp.get())
    # Another option (where you don't need to pass the value at all) is:
    logging.info(
        'The string value is %s' %
        RuntimeValueProvider.get_value('string_value',str,''))

  | beam.Create([None])
  | 'LogValueProvs' >> beam.ParDo(
      LogValueProvidersFn(my_options.string_value)))

您可能还想看看Flex模板:

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates