问题描述
我正在使用spring-kafka运行kafka消费者服务。我已将enable.auto.commit设置为false,并将AckMode设置为MANUAL_IMMEDIATE。还使用ConcurrentKafkaListenerContainerFactory并发等于10。(我的主题中的分区数)
现在,我正在听我的话题,它没有多少补偿。 我正在调试模式下运行应用程序,当我在KafkaListener方法中收到第一条消息时,便立即停止了应用程序。
应用程序在调用acknowledgment.acknowledge()
之前就停止了,但是当我检查任何延迟时,它会显示为零。
监听器代码
@KafkaListener(topics = "baeldung")
public void listen(ConsumerRecord<?,?> message,Acknowledgment acknowledgment) {
log.info("logging");
System.out.println("Received Messasge in: "+ " " + message.value().toString());
acknowledgment.acknowledge();
System.out.println("done");
}
调试指针设置为System.out.println("Received Messasge in: "+ " " + message.value().toString());
消费者属性:
@Bean
public ConsumerFactory<String,String> consumerFactory() {
Map<String,Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(10);
factory.setConsumerFactory(consumerFactory());
factory
.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
主题在运行使用者代码之前进行描述:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2
Consumer group 'testMayank2' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testMayank2 baeldung 5 135 146 11 - - -
testMayank2 baeldung 9 140 145 5 - - -
testMayank2 baeldung 6 140 145 5 - - -
testMayank2 baeldung 1 144 148 4 - - -
testMayank2 baeldung 2 141 145 4 - - -
testMayank2 baeldung 0 144 148 4 - - -
testMayank2 baeldung 7 142 147 5 - - -
testMayank2 baeldung 3 142 147 5 - - -
testMayank2 baeldung 8 141 146 5 - - -
testMayank2 baeldung 4 142 146 4 - - -
运行使用者代码后的主题描述:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2
Warning: Consumer group 'testMayank2' is rebalancing.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testMayank2 baeldung 5 146 146 0 - - -
testMayank2 baeldung 9 140 145 5 - - -
testMayank2 baeldung 6 145 145 0 - - -
testMayank2 baeldung 1 144 148 4 - - -
testMayank2 baeldung 2 141 145 4 - - -
testMayank2 baeldung 0 148 148 0 - - -
testMayank2 baeldung 7 147 147 0 - - -
testMayank2 baeldung 3 147 147 0 - - -
testMayank2 baeldung 8 146 146 0 - - -
testMayank2 baeldung 4 146 146 0 - - -
Spring boot版本:2.2.6.RELEASE kafka版本:2.3.1
我在这里做错什么了吗?预计在异常关闭期间会提交偏移吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)