Apache Beam-Google数据流-WriteToBigQuery-Python-参数-模板-管道

问题描述

我对自己的发展有2个问题。

问题1

我正在尝试通过python代码创建模板,该模板包括读取BigQuery表,应用一些转换并写入其他BigQuery表(可以存在或不存在)中。

重点是,我需要将目标表作为参数发送,但是看起来我无法在管道方法WritetoBigQuery中使用参数,因为它引发以下错误消息:apache_beam.error.RuntimeValueProviderError:RuntimeValueProvider(option: project_target,类型:str,认值:'Test')。get()不在运行时上下文中调用

方法1

with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete prevIoUs data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> Write(WritetoBigQuery(
               table=custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),schema=custom_options.target_schema.get(),write_disposition=BigQuerydisposition.WRITE_APPEND,create_disposition=BigQuerydisposition.CREATE_IF_NEEDED)

方法2

我创建了一个ParDo函数,以便在那里获取变量并设置WritetoBigQuery方法。但是,尽管管道执行成功完成,并且看到输出返回了行(理论上已写入),但我看不到表或在其上插入的数据。

    with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Pre-tasks" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....

            | 'Write table 2' >> Write(WritetoBigQuery())

我尝试了2种方法,但都无效:BigQueryBatchFileLoads和WritetoBigQuery

class writeTable(beam.DoFn):
def process(self,element):
    try:
        #Load first here the parameters from the custom_options variable (Here we can do it)

        result1 = Write(BigQueryBatchFileLoads(destination=target_table,schema=target_schema,create_disposition=BigQuerydisposition.CREATE_IF_NEEDED))
        
        result2 = WritetoBigQuery(table=target_table,create_disposition=BigQuerydisposition.CREATE_IF_NEEDED,method="FILE_LOADS"
                        ) 

问题2

我还有一个疑问,就是在最后一个ParDo类中,是否需要像在最后一个管道步骤中一样返回元素,result1或result2。

感谢您的帮助。

解决方法

执行此操作的最佳方法类似于#1,但不调用get传递值提供程序,也不传递表的lambda:

with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> WriteToBigQuery(
               table=lambda x: custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),schema=custom_options.target_schema,write_disposition=BigQueryDisposition.WRITE_APPEND,create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)

这应该有效。