问题描述
我在数据流运行程序上的apache-beam [gcp] == 2.19.0版本下在gcp的数据流中运行了一个批处理作业。我为这项工作创建了一个自定义模板。作业正在按预期运行,但我也想添加最大作业持续时间。 I found the duration (in milliseconds) parameter inside the wait_until_finish() method,which should be available。问题是:当模板化批处理作业的运行时间超过持续时间时,该如何自动使其停止?我不需要保留任何数据,我只希望作业在运行时间过长时停止。我已经实现了运行功能,如下所示:
def run():
opts = PipelineOptions()
user_options = opts.view_as(UserOptions)
p = beam.Pipeline(options=opts)
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,use_standard_sql=StaticValueProvider(bool,True))) |
"Get data" >> beam.ParDo(doStuff()) |
"Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |
"Write to BQ" >> beam.io.WriteToBigQuery(
table=user_options.table_spec,schema=user_options.table_schema,write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
result.wait_until_finish(duration=1800000)
解决方法
否,Dataflow在一定期限后不提供自动取消功能。 您仍然可以通过简单地将cancel()放入目标即可实现
result.wait_until_finish(duration=1800000)
if not result.is_in_terminal_state(): # if pipeline isn't finished,cancel
result.cancel()