问题描述
我有一个 AWS MSK 集群正在运行。连接到它并运行此命令以创建一个名为 topicoteste
usr/local/kafka_2.13-2.5.0/bin/kafka-topics --create --bootstrap-server BOOTSTRAP_STRING_HERE --partitions 1 --replication-factor 3 --topic topicoteste
这是我得到的两个错误。有什么建议吗?
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics,deadlineMs=1611587423888) timed out at 9223372036854775807 after 1 attempt(s)
[2021-01-25 15:09:24,312] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1272)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-01-25 15:09:24,314] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics,deadlineMs=1611587423888) timed out at 9223372036854775807 after 1 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createtopic(TopicCommand.scala:227)
at kafka.admin.TopicCommand$TopicService.createtopic(TopicCommand.scala:196)
at kafka.admin.TopicCommand$TopicService.createtopic$(TopicCommand.scala:191)
at kafka.admin.TopicCommand$AdminClientTopicService.createtopic(TopicCommand.scala:219)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics,deadlineMs=1611587423888) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
(kafka.admin.TopicCommand$)
解决方法
我遇到了同样的问题,因为代理使用了 TLS,而 AdminClient 未配置为使用 TLS。
您可以在 TLS 侦听器旁边运行 PLAINTEXT-listener 并使用它来创建主题或使用 --command-config <ssl.conf>
和一个看起来像这样的文件 ssl.conf 配置您的管理客户端:
ssl.endpoint.identification.algorithm=https
security.protocol=SSL
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=password