使用spring-kafka-test中的@EmbeddedKafka测试监听器

问题描述

我正在尝试在使用@KafkaListener创建的springboot中测试侦听器 但是侦听器始终在localhost:9092上侦听,而不是使用此embededKafka

我的听众是这样的:


@Component
@Slf4j
class SomeListener {
    private final List<String> receivedMessages = new ArrayList<>();

    @KafkaListener(topics = "some-ultra-cool-topic")
    public void onKafkaMessage(String theMessage) {
        log.info("Message received {}",theMessage);
        receivedMessages.add(theMessage);
    }

    Collection<String> getAll() {
        return unmodifiableCollection(receivedMessages);
    }
}

然后像这样进行spock测试:

@SpringBootTest
@EmbeddedKafka
@TestPropertySource(properties = ['spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}','spring.kafka.consumer.auto-offset-reset=earliest'])
class SomeListenerTest extends Specification {
    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker

    @Autowired
    SomeListener someListener

    void 'should receive message'() {
        given:
            def sender = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String,String>(KafkaTestUtils.producerProps(embeddedKafkaBroker)))

        when:
            sender.send('some-ultra-cool-topic','first message content')
        then:
            someListener.all.size() == 1
    }

}

我的application.yaml没有配置引导服务器-因此,它完全是spring-boot的默认设置。

我在日志中看到生产者正在向代理发送消息(它每次都在不同的随机端口上启动)。 但是侦听器总是尝试连接到本地主机上的代理:9092

如何配置它以使用此嵌入式产品?

解决方法

感谢@sawim的提示

实际问题正在测试中。我最终使用lib org.awaitility:awaitility

进行了此测试
        then:
        waitAtMost(5,SECONDS)
                .untilAsserted({ ->
                    assertThat(personFacade.findAll(),hasSize(1))
                })

第一个示例中的配置有效,但是在启动期间,我可以看到尝试连接到localhost:9200的kafka日志-似乎我们可以忽略它

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...