问题描述
我对自己的发展有2个问题。
问题1
我正在尝试通过python代码创建模板,该模板包括读取BigQuery表,应用一些转换并写入其他BigQuery表(可以存在或不存在)中。
重点是,我需要将目标表作为参数发送,但是看起来我无法在管道方法WritetoBigQuery中使用参数,因为它引发以下错误消息:apache_beam.error.RuntimeValueProviderError:RuntimeValueProvider(option: project_target,类型:str,默认值:'Test')。get()不在运行时上下文中调用
方法1
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete prevIoUs data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WritetoBigQuery(
table=custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),schema=custom_options.target_schema.get(),write_disposition=BigQuerydisposition.WRITE_APPEND,create_disposition=BigQuerydisposition.CREATE_IF_NEEDED)
方法2
我创建了一个ParDo函数,以便在那里获取变量并设置WritetoBigQuery方法。但是,尽管管道执行成功完成,并且看到输出返回了行(理论上已写入),但我看不到表或在其上插入的数据。
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Pre-tasks" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WritetoBigQuery())
我尝试了2种方法,但都无效:BigQueryBatchFileLoads和WritetoBigQuery
class writeTable(beam.DoFn):
def process(self,element):
try:
#Load first here the parameters from the custom_options variable (Here we can do it)
result1 = Write(BigQueryBatchFileLoads(destination=target_table,schema=target_schema,create_disposition=BigQuerydisposition.CREATE_IF_NEEDED))
result2 = WritetoBigQuery(table=target_table,create_disposition=BigQuerydisposition.CREATE_IF_NEEDED,method="FILE_LOADS"
)
问题2
我还有一个疑问,就是在最后一个ParDo类中,是否需要像在最后一个管道步骤中一样返回元素,result1或result2。
感谢您的帮助。
解决方法
执行此操作的最佳方法类似于#1,但不调用get
传递值提供程序,也不传递表的lambda:
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete previous data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> WriteToBigQuery(
table=lambda x: custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),schema=custom_options.target_schema,write_disposition=BigQueryDisposition.WRITE_APPEND,create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
这应该有效。