问题描述
我将override def toString = {
if (this.noGoals)
s"${this.home} vs. ${this.away} at ${this.location}: no goals"
else if (this.Tied)
s"${this.home} vs. ${this.away} at ${this.location}: tied"
}
PCollection作为side_input
转换的侧面输入传递,但得到了相同的KeyError
ParDo
下面是错误
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
from processors.appendcol import AppendCol
from side_inputs.config import sideinput_bq_config
from source.config import source_config
with beam.Pipeline(options=PipelineOptions()) as si:
side_input = si | "Reading from BQ side input" >> relational_db.ReadFromDB(
source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc"
)
with beam.Pipeline(options=PipelineOptions()) as p:
PCollection = p | "Reading records from database" >> relational_db.ReadFromDB(
source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
AppendCol(),beam.pvalue.AsIter(side_input)
)
我正在从PostgreSQL表中读取数据,PCollection的每个元素都是一个字典。
解决方法
我认为问题在于您有两个单独的管道试图一起工作。您应该将所有转换作为单个管道的一部分执行:
with beam.Pipeline(options=PipelineOptions()) as p:
side_input = p | "Reading from BQ side input" >> relational_db.ReadFromDB(
source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc")
my_pcoll = p | "Reading records from database" >> relational_db.ReadFromDB(
source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
AppendCol(),beam.pvalue.AsIter(side_input))