问题描述
我正在尝试设置一个集成测试,其中我有一个 kafka 生产者(quarkus / 标准 kafkaproducer,dockerized)将东西推送到一个主题,然后我的应用程序(quarkus / 反应式消息传递,也是 dockerized)接收它并推送到另一个主题以通知它已收到消息(主题:messageReceived)。在我的测试中,我有一个 KafkaContainer
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.4"))
.withNetworkAliases("kafkaNode")
.withNetwork(CommonNetwork.getInstance())
.withExposedPorts(9092,9093);
我的两个 dockerized 应用程序都运行良好,因为它们都共享同一个网络,发送/接收/发送场景正在运行。但是我在实际测试中遇到了 KafkaConsumer 问题,他应该检查主题 messageReceived 是否有来自我的应用程序的消息。我正在使用以下内容将引导服务器配置添加到我的消费者,这似乎正在工作:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,kafka.getBootstrapServers());
它通过 testcontainers 中的随机 ip 发现组协调器,因此配置必须正确。
discovered group coordinator localhost:49725 (id: 2147483646 rack: null)
然后它尝试连接到以下内容:
Connection to node -1 (localhost/127.0.0.1:9092) Could not be established. broker may not be available.
Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
当它应该连接到与组协调器相同的 localhost:49725 时,因为如果我理解正确,我只有一个节点充当代理和组协调器。
我可能做错了什么,但我不知道是什么。可以在这个问题上使用一些帮助。谢谢。
解决方法
在探索用于集成测试的 TestContainers 时,我遇到了类似的问题。我设法用下面的代码解决了它:
KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
kafkaContainer.setPortBindings(Arrays.asList("9092:9092","9093:9093","2181:2181"));
kafkaContainer.start();
kafkaContainer.waitingFor(Wait.forListeningPort());
KafkaConsumer 使用以下方式连接:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"PLAINTEXT://localhost:9093");
这里的关键是 PLAINTEXT://localhost:9093
。