taskManager 无法连接到新选择的 jobmanager

问题描述

我有一个在 minikube 上运行的 Flink 集群:1 个作业管理器和 3 个任务管理器。 我正在使用 Kubernetes Ha 服务来处理 jobmanager leader 选举。

当我试图杀死 jobmanager 以模拟崩溃时,taskmanager 无法连接 新的 jobmanager 它总是尝试连接之前终止的 jobmanager 的 IP 地址。

这里有一个例外:

2021-05-05 12:14:28.126 [flink-akka.actor.default-dispatcher-3] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-7 - Association with remote system [akka.tcp://flink@172.17.0.7:6123] has Failed,address is Now gated for [50] ms. Reason: [Association Failed with [akka.tcp://flink@172.17.0.7:6123]] Caused by: [java.net.noroutetoHostException: No route to host]
2021-05-05 12:14:28.131 [flink-akka.actor.default-dispatcher-3] ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  - Unhandled exception.
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:61)
    at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@172.17.0.7:6123/user/rpc/resourcemanager_0.
    at org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$10(AkkaRpcService.java:570)
    at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59)
    ... 5 common frames omitted
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@172.17.0.7:6123/user/rpc/resourcemanager_0.
    ... 7 common frames omitted
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@172.17.0.7:6123/),Path(/user/rpc/resourcemanager_0)]
    at akka.actor.ActorSelection.$anonfun$resolveOne$1(ActorSelection.scala:71)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:81)
    at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:120)
    at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:114)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:80)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:556)
    at akka.actor.DeadLetteractorRef.specialHandle(ActorRef.scala:593)
    at akka.actor.DeadLetteractorRef.$bang(ActorRef.scala:582)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetteractorRef.$bang(RemoteActorRefProvider.scala:104)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:606)
    at akka.actor.Actor.aroundPostStop(Actor.scala:536)
    at akka.actor.Actor.aroundPostStop$(Actor.scala:536)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:458)
    at akka.actor.dungeon.FaultHandling.finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling.terminate(FaultHandling.scala:172)
    at akka.actor.dungeon.FaultHandling.terminate$(FaultHandling.scala:142)
    at akka.actor.ActorCell.terminate(ActorCell.scala:429)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:533)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:549)
    at akka.dispatch.MailBox.processAllSystemMessages(MailBox.scala:283)
    at akka.dispatch.MailBox.run(MailBox.scala:224)
    at akka.dispatch.MailBox.exec(MailBox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...