Kafka 管理员超时

问题描述

我有一个 AWS MSK 集群正在运行。连接到它并运行此命令以创建一个名为 topicoteste

的测试主题
usr/local/kafka_2.13-2.5.0/bin/kafka-topics --create --bootstrap-server BOOTSTRAP_STRING_HERE --partitions 1 --replication-factor 3 --topic topicoteste

这是我得到的两个错误。有什么建议吗?

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics,deadlineMs=1611587423888) timed out at 9223372036854775807 after 1 attempt(s)
[2021-01-25 15:09:24,312] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
    at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
    at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1272)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2021-01-25 15:09:24,314] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics,deadlineMs=1611587423888) timed out at 9223372036854775807 after 1 attempt(s)
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.TopicCommand$AdminClientTopicService.createtopic(TopicCommand.scala:227)
    at kafka.admin.TopicCommand$TopicService.createtopic(TopicCommand.scala:196)
    at kafka.admin.TopicCommand$TopicService.createtopic$(TopicCommand.scala:191)
    at kafka.admin.TopicCommand$AdminClientTopicService.createtopic(TopicCommand.scala:219)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics,deadlineMs=1611587423888) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
 (kafka.admin.TopicCommand$)

解决方法

我遇到了同样的问题,因为代理使用了 TLS,而 AdminClient 未配置为使用 TLS。

您可以在 TLS 侦听器旁边运行 PLAINTEXT-listener 并使用它来创建主题或使用 --command-config <ssl.conf> 和一个看起来像这样的文件 ssl.conf 配置您的管理客户端:

ssl.endpoint.identification.algorithm=https
security.protocol=SSL
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=password