无法通过阻止访问外部的PCollection

问题描述

for table_name,key_pair in relation_repl_key.items():
  try:
    with beam.Pipeline(options=PipelineOptions()) as p:
      PCollection = p | "Reading from source database" >> relational_db.ReadFromDB(
        source_config=source_config,table_name=table_name,query="SELECT {} FROM {}".format(
          key_pair["col"],table_name
        )
      )
      side_input = bq(p,sideinput_bq_config,table_name,key_pair["repl_key"])
  except RuntimeError:
    pass
  else:
    PCollection | "Selecting updated rows" >> beam.ParDo(
      KeyCheck(),beam.pvalue.AsSingleton(side_input)
    ) 
  finally:
    load(PCollection,key_pair["primary_key"],key_pair["jsonb_col"])

我无法访问PCollection块之外的with。在PCollection | beam.Map(print)内部运行finally:不会返回任何结果。

解决方法

PCollection的作用域是管道。在with块之外,PCollection不再是有效的引用。