如何正确使用Dataflow / Apache Beam wait_until_finish持续时间参数?

问题描述

我在数据流运行程序上的apache-beam [gcp] == 2.19.0版本下在gcp的数据流中运行了一个批处理作业。我为这项工作创建了一个自定义模板。作业正在按预期运行,但我也想添加最大作业持续时间。 I found the duration (in milliseconds) parameter inside the wait_until_finish() methodwhich should be available。问题是:当模板化批处理作业的运行时间超过持续时间时,该如何自动使其停止?我不需要保留任何数据,我只希望作业在运行时间过长时停止。我已经实现了运行功能,如下所示:

def run():
    opts = PipelineOptions()
    user_options = opts.view_as(UserOptions)
    p = beam.Pipeline(options=opts)

    (p |
     "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,use_standard_sql=StaticValueProvider(bool,True))) |
     "Get data" >> beam.ParDo(doStuff()) |

     "Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |

     "Write to BQ" >> beam.io.WriteToBigQuery(
                table=user_options.table_spec,schema=user_options.table_schema,write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
     )

    result = p.run()

    result.wait_until_finish(duration=1800000)

解决方法

否,Dataflow在一定期限后不提供自动取消功能。 您仍然可以通过简单地将cancel()放入目标即可实现

    result.wait_until_finish(duration=1800000)
    if not result.is_in_terminal_state():   # if pipeline isn't finished,cancel
      result.cancel()

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...