K8s 上的 Flink 1.13 HA

问题描述

我在基于 k8s 设置 Flink 集群时遇到问题。我只是在测试基于k8s的Flink和zookeeper HA解决方案。作业的元数据检查点正计划编写 S3。这是 Flink-conf.yaml 如下

    high-availability: zookeeper
    high-availability.cluster-id: /flink
    high-availability.jobmanager.port: 50010
    high-availability.storageDir: s3://dsw-dia-test/recovery
    high-availability.zookeeper.client.acl: open
    high-availability.zookeeper.quorum: zetcd:2181
    state.backend: filesystem
    state.checkpoints.dir: s3://dsw-dia-test/checkpoints
    state.backend.fs.checkpointdir: s3://dsw-dia-test/checkpoints
    fs.allowed-fallback-filesystems: s3
    s3.endpoint: s3.us-south.cloud-object-storage.appdomain.cloud
    s3.access-key: **************
    s3.secret-key: *******************

我已经检查了插件,其中子文件夹在 pod 中已经有这些 s3-fs-presto 和 s3-fs-hadoop 文件

root@flink-taskmanager-d7f8b4f87-nb5ds:/opt/flink/plugins# ls -lrt
total 44
-rwxr-xr-x 1 flink flink  654 Jan 29 16:03 README.txt
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-statsd
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-slf4j
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-prometheus
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-jmx
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-influx
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-graphite
drwxrwxr-x 1 flink flink 4096 May 25 12:36 metrics-datadog
drwxrwxr-x 1 flink flink 4096 May 25 12:36 external-resource-gpu
drwxr-xr-x 1 flink flink 4096 Jul  1 13:32 s3-fs-presto
drwxr-xr-x 1 flink flink 4096 Jul  1 13:32 s3-fs-hadoop
root@flink-taskmanager-d7f8b4f87-nb5ds:/opt/flink/plugins/s3-fs-presto# ls -lrt
total 32692
-rw-r--r-- 1 flink flink 33474159 May 25 12:20 flink-s3-fs-presto-1.13.1.jar
root@flink-taskmanager-d7f8b4f87-nb5ds:/opt/flink/plugins/s3-fs-hadoop# ls -lrt
total 19796
-rw-r--r-- 1 flink flink 20269950 May 25 12:20 flink-s3-fs-hadoop-1.13.1.jar
root@flink-taskmanager-d7f8b4f87-nb5ds:/opt/flink/lib# ls -lrt
total 198248
-rw-rw-r-- 1 flink flink    276771 Oct 10  2019 log4j-api-2.12.1.jar
-rw-rw-r-- 1 flink flink     23518 Oct 10  2019 log4j-slf4j-impl-2.12.1.jar
-rw-rw-r-- 1 flink flink   1674433 Oct 10  2019 log4j-core-2.12.1.jar
-rw-rw-r-- 1 flink flink     67114 Oct 10  2019 log4j-1.2-api-2.12.1.jar
-rw-rw-r-- 1 flink flink   7709740 Apr  8 14:08 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 flink flink    148131 May 25 12:32 flink-json-1.13.1.jar
-rw-r--r-- 1 flink flink     92311 May 25 12:32 flink-csv-1.13.1.jar
-rw-r--r-- 1 flink flink  36417228 May 25 12:35 flink-table_2.11-1.13.1.jar
-rw-r--r-- 1 flink flink  40965908 May 25 12:35 flink-table-blink_2.11-1.13.1.jar
-rw-r--r-- 1 flink flink 115530972 May 25 12:36 flink-dist_2.11-1.13.1.jar
-rw-r--r-- 1 flink flink     78648 Jul  1 02:19 flink-hadoop-compatibility_2.12-1.13.1.jar

job-manager pod 中的错误如下

io.IOException: Could not create FileSystem for highly available storage path (s3://dsw-dia-test/recovery/s3.us-south.cloud-object-storage.appdomain.cloud/flink)
    at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:92)
    at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76)
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:115)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runcluster(ClusterEntrypoint.java:239)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runclusterEntrypoint(ClusterEntrypoint.java:600)
    at org.apache.flink.runtime.entrypoint.StandalonesessionClusterEntrypoint.main(StandalonesessionClusterEntrypoint.java:59)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme Could be loaded. For a full list of supported file systems,please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:530)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
    at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89)
    ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
    at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:55)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)
    ... 13 more
.
2021-07-02 06:41:34,349 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Stopping Akka RPC service.
2021-07-02 06:41:34,374 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2021-07-02 06:41:34,376 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2021-07-02 06:41:34,403 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
2021-07-02 06:41:34,429 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Stopped Akka RPC service.
2021-07-02 06:41:34,429 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Could not start cluster entrypoint StandalonesessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandalonesessionClusterEntrypoint.
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runclusterEntrypoint(ClusterEntrypoint.java:600) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.StandalonesessionClusterEntrypoint.main(StandalonesessionClusterEntrypoint.java:59) [flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://dsw-dia-test/recovery/s3.us-south.cloud-object-storage.appdomain.cloud/flink)
    at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:92) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:115) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runcluster(ClusterEntrypoint.java:239) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    ... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme Could be loaded. For a full list of supported file systems,please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:530) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:115) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runcluster(ClusterEntrypoint.java:239) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    ... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
    at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:55) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:89) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:76) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:115) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:353) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:311) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runcluster(ClusterEntrypoint.java:239) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
    ... 2 more

任何帮助,对此表示赞赏。

李维

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)