使用 EmbeddedKafka Spring 进行测试时侦听器不使用消息

问题描述

我试图查看当消费者收到来自 kafka 主题的消息但测试未通过并且消费者甚至没有收到消息时是否调用了服务。

我的测试:

@SpringBoottest
@DirtiesContext
@EmbeddedKafka(partitions = 1,brokerProperties = { "listeners=PLAINTEXT://localhost:9092","port=9092" })
class ConsumerTest {

    @Autowired
    Consumer consumer;

    @Mock
    private Service service;

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    
    @Test
    public void givenEmbeddedKafkabroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {

        String message = "hi";

        kafkaTemplate.send("topic",message);

        consumer.getLatch().await(10000,TimeUnit.MILLISECONDS);

       
        verify(service,times(1)).addMessage();
    }
}

主包中的消费者是一个普通消费者,带有@KafkaListener(topics = "topic")。 然后我有一个配置文件

@EnableKafka
@Configuration
public class KafkaConfig {
  
  @Bean
  public Map<String,Object> consumerConfigs() {
    Map<String,Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONfig,"group");
    return props;
  }

  @Bean
  public ConsumerFactory<String,String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new StringDeserializer());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String,String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
  }


}

也在 application.properties(在测试包内)我把这个:

春天: 卡夫卡: 消费者: 自动偏移重置:最早 组 ID:组

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)