问题描述
我们使用 Apache Beam 构建了 Dataflow,并部署在 GCP Dataflow 基础架构中。 Dataflow 实例第一次完美运行,并按预期创建分区表,但是当它第二次运行时,它会从数据集中清除结果,而不是替换为该特定分区中的新数据集。使用我本地设置中的 Direct runner 运行时,作业完美运行。
代码示例:
pipeline.apply(
"Read from BigQuery (table_name) Table: ",BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",FIELDS.stream().collect(Collectors.joining(",")),project,dataset,table))
.usingStandardsql()
.withoutValidation()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
如果知道我在这里遗漏了什么,请告诉我。提前致谢!
解决方法
想通了!
这是我对模板化环境所做的更改:
"Read from BigQuery (table_name) Table: ",BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",FIELDS.stream().collect(Collectors.joining(",")),project,dataset,table))
.usingStandardSql()
.withoutValidation()
.withTemplateCompatibility()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
.withTemplateCompatibility()
请参阅更多文档here