在将PCollection作为Apache Beam上的侧面输入传递时发生KeyError

问题描述

我将override def toString = { if (this.noGoals) s"${this.home} vs. ${this.away} at ${this.location}: no goals" else if (this.Tied) s"${this.home} vs. ${this.away} at ${this.location}: tied" } PCollection作为side_input转换的侧面输入传递,但得到了相同的KeyError

ParDo

下面是错误

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
from processors.appendcol import AppendCol
from side_inputs.config import sideinput_bq_config
from source.config import source_config


with beam.Pipeline(options=PipelineOptions()) as si:
  side_input = si | "Reading from BQ side input" >> relational_db.ReadFromDB(
    source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc"
  )

with beam.Pipeline(options=PipelineOptions()) as p:
  PCollection = p | "Reading records from database" >> relational_db.ReadFromDB(
    source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
   AppendCol(),beam.pvalue.AsIter(side_input)
 )

我正在从PostgreSQL表中读取数据,PCollection的每个元素都是一个字典。

解决方法

我认为问题在于您有两个单独的管道试图一起工作。您应该将所有转换作为单个管道的一部分执行:

with beam.Pipeline(options=PipelineOptions()) as p:
  side_input = p | "Reading from BQ side input" >> relational_db.ReadFromDB(
    source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc")

  my_pcoll = p | "Reading records from database" >> relational_db.ReadFromDB(
        source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
        AppendCol(),beam.pvalue.AsIter(side_input))

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...