问题描述
我是Apache Beam的新手,我创建了一个简单的python pipeline,并使用以下命令运行:
<input type="button" th:text="${place}">
技术细节:
- Apache Flink版本:1.10.1
- Kubernetes版本:1.18
- Python:3.8.5
- Minikube:1.12.3
- Apache Beam:v1.23
- Apache Beam python SDK:3.7
我在minikube上设置了一个Apache Flink集群,然后将其转发给Jobmanager,以便在运行上述脚本时它将提交作业。我使用python scripts/main.py \
--runner FlinkRunner \
--flink_master localhost:8081 \
--setup_file scripts/setup.py \
--environment_type EXTERNAL \
--environment_config localhost:50000 \
--database postgres \
--input /dataset/league_of_legends.csv \
--database_host db \
--table_name league_game_board \
--database_user postgres \
--database_password postgres
将数据集复制到容器中,然后执行到其中以确保在正确的位置找到了数据。在工作到达任务经理之前,一切似乎进展顺利。任务管理器将作业提交到Apache Beam服务器,但是失败,并在日志中显示以下输出:
kubectl cp
然后,Beam Job Server失败,并显示类似错误:
java.io.FileNotFoundException: /var/folders/qv/ztv4pp7n4r1gv38m2pj94l_00000gn/T/beam-tempabvjrxov/artifactsal8uquz5/9a7e79a7285955a56f5ab55fa5eb522eeb5a7bfcbbfe616a6b0bef5314a21ee8/1-ref_Environment_default_e-workflow.tar.gz (No such file or directory)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.io.FileInputStream.open0(Native Method)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.io.FileInputStream.open(FileInputStream.java:195)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.io.FileInputStream.<init>(FileInputStream.java:138)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.lang.Thread.run(Thread.java:748)
我的minikube Flink群集的设置可以找到here。
我已经搜索了与此flink-taskmanager-585fc984d8-hhzg2 beamserver 2020/08/20 06:19:56 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc =
和/tmp/staged/workflow.tar.gz
的任何参考,但似乎找不到任何有关如何将其提供给我的设置的参考。
这里的问题是什么压缩包,如何将其提供给Flink群集
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)