问题描述
我想问一个关于TestDataflowRunner
的问题。
我创建了一个使用DataflowRunner
运行的光束管道(Java)。我在与网络关联的区域asia-southeast1
中部署了该工作器。管道按DataflowRunner
中的预期正常运行。因此,我也想使用@ValidatesRunner
创建TestDataflowRunner
测试。我已经使用相同的服务帐户和相同的网络运行了测试。执行图看起来也很好加载,但是无法配置工作程序。
以下是我用于运行测试的命令。
task validatesRunnerTests(type: Test) {
group = "Verification"
description = "Run tests that require a Dataflow runner to validate that pipelines/transforms work correctly"
systemProperty "beamTestPipelineOptions",JsonOutput.toJson([
"--runner=TestDataflowRunner","--project=$projectId","--region=us-central1","--workerZone=$zone","--usePublicIps=false","--network=$network","--subnetwork=$subnetwork","--tempRoot=$stagingBucket","--serviceAccount=$serviceAccount",])
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
}
}
服务帐户具有以下角色。
-
roles/dataflow.admin
-
roles/dataflow.worker
我只有以下错误日志,但在Stackdriver VM实例中找不到任何错误日志。
2020-08-12 17:16:27.061 ICT Startup of the worker pool in zone asia-southeast1-a Failed to bring up any of the desired 1 workers. The project quota may have been exceeded or access control policies may be preventing the operation; review the Stackdriver Logging "GCE VM Instance" log for diagnostics.
2020-08-12 17:16:27.095 ICT Workflow Failed. Causes: Internal Issue (8c283568ab7f3c3c): 82159483:17
有人知道这个问题并且可以帮助我吗?
谢谢
解决方法
问题似乎是让工作只能通过专用IP来启动。我使用TestDataflowRunner启动了Dataflow作业,并禁用了公共IP。
我首先克隆了WordCount存储库[1],并对WordCount.java
进行了三处更改。
添加了以下库:
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
已更改:
public interface WordCountOptions extends PipelineOptions {
到
public interface WordCountOptions extends TestPipelineOptions {
和
Pipeline p = Pipeline.create(options);
到
Pipeline p = TestPipeline.create(options);
我使用了Maven,并运行了以下命令:
mvn clean compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args=" \
--project=$PROJECT_ID\
--tempRoot=gs://mcbuckety/tempf
--output=gs://$BUCKET_NAME/testrundf \
--runner=TestDataflowRunner
--usePublicIps=false
--subnetwork=regions/us-central1/subnetworks/private"
-Pdataflow-runner
代码编译并启动了Dataflow作业。我转到GCE页面检查VM是否没有公共IP,确实如此。我知道代码不是严格测试的;我没有断言任何内容,但我有准备启动的管道。
[1] https://beam.apache.org/get-started/quickstart-java/#get-the-wordcount-code