Apache Beam / Google Cloud Dataflow 大查询阅读器在第二次运行时失败

问题描述

我们使用 Apache Beam 构建了 Dataflow,并部署在 GCP Dataflow 基础架构中。 Dataflow 实例第一次完美运行,并按预期创建分区表,但是当它第二次运行时,它会从数据集中清除结果,而不是替换为该特定分区中的新数据集。使用我本地设置中的 Direct runner 运行时,作业完美运行。

代码示例:

        pipeline.apply(
            "Read from BigQuery (table_name) Table: ",BigQueryIO.readTableRows()
                .fromQuery(
                    String.format(
                        "SELECT  %s FROM `%s.%s.%s`",FIELDS.stream().collect(Collectors.joining(",")),project,dataset,table))
                .usingStandardsql()
                .withoutValidation()));
    PCollection<VideoPlacement.Placement> rows =
        tableRow.apply(
            "TableRows to BigQueryVideoPlacement.Placement",MapElements.into(TypeDescriptor.of(Model.class))
                .via(Model::fromTableRow));

如果知道我在这里遗漏了什么,请告诉我。提前致谢!

解决方法

想通了!

这是我对模板化环境所做的更改:

            "Read from BigQuery (table_name) Table: ",BigQueryIO.readTableRows()
                .fromQuery(
                    String.format(
                        "SELECT  %s FROM `%s.%s.%s`",FIELDS.stream().collect(Collectors.joining(",")),project,dataset,table))
                .usingStandardSql()
                .withoutValidation()
                .withTemplateCompatibility()));
    PCollection<VideoPlacement.Placement> rows =
        tableRow.apply(
            "TableRows to BigQueryVideoPlacement.Placement",MapElements.into(TypeDescriptor.of(Model.class))
                .via(Model::fromTableRow));
.withTemplateCompatibility()

请参阅更多文档here