已连接到组协调器但无法连接到 kafka 节点

问题描述

我正在尝试设置一个集成测试,其中我有一个 kafka 生产者(quarkus / 标准 kafkaproducer,dockerized)将东西推送到一个主题,然后我的应用程序(quarkus / 反应式消息传递,也是 dockerized)接收它并推送到另一个主题通知它已收到消息(主题:messageReceived)。在我的测试中,我有一个 KafkaContainer

@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.4"))
        .withNetworkAliases("kafkaNode")
        .withNetwork(CommonNetwork.getInstance())
        .withExposedPorts(9092,9093);

我的两个 dockerized 应用程序都运行良好,因为它们都共享同一个网络,发送/接收/发送场景正在运行。但是我在实际测试中遇到了 KafkaConsumer 问题,他应该检查主题 messageReceived 是否有来自我的应用程序的消息。我正在使用以下内容将引导服务器配置添加到我的消费者,这似乎正在工作:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,kafka.getBootstrapServers());

它通过 testcontainers 中的随机 ip 发现组协调器,因此配置必须正确。

discovered group coordinator localhost:49725 (id: 2147483646 rack: null)

然后它尝试连接到以下内容

Connection to node -1 (localhost/127.0.0.1:9092) Could not be established. broker may not be available.
Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

当它应该连接到与组协调器相同的 localhost:49725 时,因为如果我理解正确,我只有一个节点充当代理和组协调器。

我可能做错了什么,但我不知道是什么。可以在这个问题上使用一些帮助。谢谢。

解决方法

在探索用于集成测试的 TestContainers 时,我遇到了类似的问题。我设法用下面的代码解决了它:

KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
        kafkaContainer.setPortBindings(Arrays.asList("9092:9092","9093:9093","2181:2181"));
        kafkaContainer.start();
        kafkaContainer.waitingFor(Wait.forListeningPort());

KafkaConsumer 使用以下方式连接:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"PLAINTEXT://localhost:9093");

这里的关键是 PLAINTEXT://localhost:9093