问题描述
for table_name,key_pair in relation_repl_key.items():
try:
with beam.Pipeline(options=PipelineOptions()) as p:
PCollection = p | "Reading from source database" >> relational_db.ReadFromDB(
source_config=source_config,table_name=table_name,query="SELECT {} FROM {}".format(
key_pair["col"],table_name
)
)
side_input = bq(p,sideinput_bq_config,table_name,key_pair["repl_key"])
except RuntimeError:
pass
else:
PCollection | "Selecting updated rows" >> beam.ParDo(
KeyCheck(),beam.pvalue.AsSingleton(side_input)
)
finally:
load(PCollection,key_pair["primary_key"],key_pair["jsonb_col"])
我无法访问PCollection
块之外的with
。在PCollection | beam.Map(print)
内部运行finally:
不会返回任何结果。
解决方法
PCollection的作用域是管道。在with
块之外,PCollection不再是有效的引用。