嵌入式 Kafka 测试 - 未调用 Kafka 侦听器

问题描述

我创建了一个单元测试来测试 Kafka 侦听器,如下所示。

@SpringBootTest
@EmbeddedKafka(partitions = 1,brokerProperties = {"listeners=PLAINTEXT://localhost:9092","port=909"})
class ConsumerTest {

    @Autowired
    KafkaTemplate producer;

    @Test
    public void consumeEvents1Test() throws InterruptedException {
        producer.send("events1","Sample message");
        Thread.sleep(1000);
    }
}

Consumer 创建如下所示。

@Component
public class Consumer {
    Logger LOG = LoggerFactory.getLogger(Consumer.class);

    @KafkaListener(id= "${topic1}",topics = "${topic1}",groupId = "${consumer.group1}",concurrency = "1",containerFactory = "kafkaListenerContainerFactory")
    public void consumeEvents1(String message,@Headers Map<String,String> header,Acknowledgment acknowledgment) {
        LOG.info("Message - {}",message);
        LOG.info(header.get(KafkaHeaders.GROUP_ID) + header.get(KafkaHeaders.RECEIVED_TOPIC)+String.valueOf(header.get(KafkaHeaders.OFFSET)));
        acknowledgment.acknowledge();


    }
}

Consumer Factory 和Container Factory 的创建如下所示。

@Bean
public ConsumerFactory<String,Object> consumerFactory() {
    Map<String,Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String,Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setAutoStartup(autoStart);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
   


    return factory;
}

来自 POM 的依赖是,

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            
        </dependency>
        
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
        </dependency>


    </dependencies>

但是,当我调用测试用例时,消息会发布到嵌入式 Kafka,但并未调用实际的侦听器。不确定测试设置有什么问题。

application.properties 如下,

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
consumer.group1=events-group1
topic1=events1
kafka.listener.autostart=true

解决方法

您正在创建自己的 ConsumerFactory 所以

spring.kafka.consumer.auto-offset-reset=earliest

没有被应用。记录在消费者开始之前发布,因此您有竞争条件。

你需要

props.put(
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

或者你应该使用 Boot 的自动配置的消费者工厂。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...