问题描述
我试图查看当消费者收到来自 kafka 主题的消息但测试未通过并且消费者甚至没有收到消息时是否调用了服务。
我的测试:
@SpringBoottest
@DirtiesContext
@EmbeddedKafka(partitions = 1,brokerProperties = { "listeners=PLAINTEXT://localhost:9092","port=9092" })
class ConsumerTest {
@Autowired
Consumer consumer;
@Mock
private Service service;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Test
public void givenEmbeddedKafkabroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
String message = "hi";
kafkaTemplate.send("topic",message);
consumer.getLatch().await(10000,TimeUnit.MILLISECONDS);
verify(service,times(1)).addMessage();
}
}
主包中的消费者是一个普通消费者,带有@KafkaListener(topics = "topic")。 然后我有一个配置文件:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public Map<String,Object> consumerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONfig,"group");
return props;
}
@Bean
public ConsumerFactory<String,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
也在 application.properties(在测试包内)我把这个:
春天: 卡夫卡: 消费者: 自动偏移重置:最早 组 ID:组
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)