在数据流管道中动态设置bigquery数据集

问题描述

是否可以根据在上一个数据流步骤中处理的数据将数据插入到不同的bigQuery数据集中?


我正在创建一个数据流管道,它正在从PubSub订阅中读取并写入大查询表。定义如下:

def run(argv=None,save_main_session=True):
    options: PipelineOptions = PipelineOptions(
        project='project-id',runner='DataflowRunner',region='region',streaming=True,setup_file='dataflow/setup.py',autoscaling_algorithm='THROUGHPUT_BASED',job_name='telemetry-processing'
    )

    with beam.Pipeline(options=options) as p:
        status = (
                p
                 | 'Get Status PubSub' >> beam.io.ReadFrompubSub(
            subscription='projects/project-id/subscriptions/subscription-id',with_attributes=True))

        status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: 
convert_proto_to_dict(x,nozzle_status_proto.NozzleStatus)) )

        status_records | 'Write status to BQ' >> beam.io.WritetoBigQuery('project- 
id:dataset-id.table-id')

         bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
         bytes_status | 'Write to BQ BackUp' >> beam.io.WritetoBigQuery(
        'project-id:dataset-id.backup-table-id')

对于给定的输入和输出,它完全按预期工作。
我想要的是,关于我的PubSubMessage中具有的特定属性,以定义我的消息应放在哪个数据集上。 所以我需要更改的部分是这个:

status_records | 'Write status to BQ' >> beam.io.WritetoBigQuery('project-id:dataset-id.table-id')

我已经尝试提取所需的数据并像这样使用它:

status_records | 'Write status to BQ' >> beam.io.WritetoBigQuery('project-id:{data-from-prevIoUs-step}.table-id')

但是我们不能直接从PCollection获取数据。

我曾尝试按照本文(How can I write to Big Query using a runtime value provider in Apache Beam?)覆盖WritetoBigQuery,但没有出现错误,也没有插入任何内容

我看不到如何实现的。
你知道我应该从哪里开始吗?
我必须为n个数据集创建n条管道吗?

解决方法

WriteToBigQuery的“表”参数可以是从元素到应写入表的函数。例如:

status_records | 'Write' >> beam.io.WriteToBigQuery(
  lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')