问题描述
我对Apache Beam(尤其是数据流)有疑问。
我有一个从cloudsql数据库读取并写入GCS的管道。文件名中带有时间戳。我希望每次运行它时,都会生成一个带有不同时间戳的文件。
我在本地计算机上进行了测试。 Beam从postgres db读取并写入文件(而不是gcs)。它工作正常。生成的文件中具有不同的时间戳。喜欢
jdbc_output.csv-00000-of-00001_2020-08-19_00:11:17.csv
jdbc_output.csv-00000-of-00001_2020-08-19_00:25:07.csv
但是,当我部署到Dataflow时,通过Airflow触发它(我们将气流作为调度程序),它生成的文件名始终使用相同的时间戳。即使多次运行,时间戳也不会改变。时间戳非常接近上传数据流模板的时间。
这是简单的代码。
output.apply("Write to Bucket",TextIO.write().to("gs://my-bucket/filename").withNumShards(1)
.withSuffix("_" + String.valueOf(new Timestamp(new Date().getTime())).replace(" ","_") +".csv"));
我想知道为什么数据流不使用文件名中的当前时间,而是使用上载模板文件时的时间戳记的原因。
此外,如何解决此问题?我的计划是每天运行数据流,并期望其中包含不同时间戳的新文件。
解决方法
我的直觉(因为我从未测试过)是模板创建启动您的管道并对其进行快照。因此,您的管道已运行,日期时间已评估并保持原样在模板中。价值永远不会改变。
documentation description还提到管道在模板创建之前运行,就像编译一样。
- 开发人员运行管道并创建模板。 Apache Beam SDK会在Cloud Storage中暂存文件,创建模板文件(类似于作业请求),然后将模板文件保存在Cloud Storage中。
要解决此问题,您可以使用ValueProvider
界面。而且,我之前从未建立过链接,但它位于template section of the documentation
注意:但是,对于Cloud SQL数据库中的简单读取和导出到文件而言,最便宜且最易于维护的是不要使用Dataflow!