通过相关管道处理Dataflow / Apache Beam中的拒绝

问题描述

我有一个从BigQuery获取数据并将其写入GCS的管道,但是,如果我发现任何拒绝,我都希望将它们纠正到Bigquery表中。我将拒绝收集到全局列表变量中,然后将列表加载到BigQuery表中。当我在本地运行它时,该过程可以正常工作,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在pipeline2之前运行。是否可以使用python在Dataflow中使用依赖的管道?或者也请建议是否可以使用更好的方法解决此问题。预先感谢。

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable
               ....etc
               | 'to gcs' >> beam.io.WritetoText(output)
               )

# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
    rejects = (pipeline2
                    | 'create pipeline' >> beam.Create(reject_list)
                    | 'to json format' >> beam.Map(lambda data: {.....})
                    | 'to bq' >> beam.io.WritetoBigQuery(....)
                    )

解决方法

您可以执行类似的操作,但只有1个管道,并且转换中需要一些其他代码。

let card = document.getElementsByClassName('card'); let allSelected = document.querySelectorAll('selected'); let btn = document.getElementsByClassName('btn'); for (var i = 0; i < card.length; i++) { card[i].addEventListener('click',function(e) { e.target.classList.add('selected'); }) } 应该有两个输出:一个被写入GCS,一个被拒绝的元素最终将被写入BigQuery。

为此,您的转换函数将必须返回beam.Map(lambda x: somefunction)

Beam编程指南中有一个示例:https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn

第二个TaggedOutput,然后您可以写入BigQuery。

管道的第二个分支中不需要PCollection

管道将是这样的:

Create

然后with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1: data = (pipeline1 | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True)) | 'combine output to list' >> beam.combiners.ToList() | 'tranform' >> beam.Map(transform) # Tagged output produced here pcoll_to_gcs = data.gcs_output pcoll_to_bq = data.rejected pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output) pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....) 函数将是这样的

transform