FlinkRunner上的Apache Beam Python出现java.io.FileNotFoundException失败

问题描述

我是Apache Beam的新手,我创建了一个简单的python pipeline,并使用以下命令运行:

<input type="button" th:text="${place}">

技术细节:

  • Apache Flink版本:1.10.1
  • Kubernetes版本:1.18
  • Python:3.8.5
  • Minikube:1.12.3
  • Apache Beam:v1.23
  • Apache Beam python SDK:3.7

我在minikube上设置了一个Apache Flink集群,然后将其转发给Jobmanager,以便在运行上述脚本时它将提交作业。我使用python scripts/main.py \ --runner FlinkRunner \ --flink_master localhost:8081 \ --setup_file scripts/setup.py \ --environment_type EXTERNAL \ --environment_config localhost:50000 \ --database postgres \ --input /dataset/league_of_legends.csv \ --database_host db \ --table_name league_game_board \ --database_user postgres \ --database_password postgres 将数据集复制到容器中,然后执行到其中以确保在正确的位置找到了数据。在工作到达任务经理之前,一切似乎进展顺利。任务管理器将作业提交到Apache Beam服务器,但是失败,并在日志中显示以下输出:

kubectl cp

然后,Beam Job Server失败,并显示类似错误:

java.io.FileNotFoundException: /var/folders/qv/ztv4pp7n4r1gv38m2pj94l_00000gn/T/beam-tempabvjrxov/artifactsal8uquz5/9a7e79a7285955a56f5ab55fa5eb522eeb5a7bfcbbfe616a6b0bef5314a21ee8/1-ref_Environment_default_e-workflow.tar.gz (No such file or directory)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.io.FileInputStream.open0(Native Method)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.io.FileInputStream.open(FileInputStream.java:195)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.io.FileInputStream.<init>(FileInputStream.java:138)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.lang.Thread.run(Thread.java:748)

我的minikube Flink群集的设置可以找到here

我已经搜索了与此flink-taskmanager-585fc984d8-hhzg2 beamserver 2020/08/20 06:19:56 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/workflow.tar.gz flink-taskmanager-585fc984d8-hhzg2 beamserver caused by: flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz flink-taskmanager-585fc984d8-hhzg2 beamserver caused by: flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz flink-taskmanager-585fc984d8-hhzg2 beamserver caused by: flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz flink-taskmanager-585fc984d8-hhzg2 beamserver caused by: flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = /tmp/staged/workflow.tar.gz的任何参考,但似乎找不到任何有关如何将其提供给我的设置的参考。

这里的问题是什么压缩包,如何将其提供给Flink群集

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)