当使用者应用程序在确认之前停止时,即使AckMode在MANUAL_IMMEDIATE中,Spring-Kafka偏移也会被提交

问题描述

我正在使用spring-kafka运行kafka消费者服务。我已将enable.auto.commit设置为false,并将AckMode设置为MANUAL_IMMEDIATE。还使用ConcurrentKafkaListenerContainerFactory并发等于10。(我的主题中的分区数)

现在,我正在听我的话题,它没有多少补偿。 我正在调试模式下运行应用程序,当我在KafkaListener方法中收到第一条消息时,便立即停止了应用程序。

应用程序在调用acknowledgment.acknowledge()之前就停止了,但是当我检查任何延迟时,它会显示为零。

监听器代码

@KafkaListener(topics = "baeldung")
  public void listen(ConsumerRecord<?,?> message,Acknowledgment acknowledgment) {
    log.info("logging");

    System.out.println("Received Messasge in: "+ " " + message.value().toString());
      acknowledgment.acknowledge();
    System.out.println("done");
  }

调试指针设置为System.out.println("Received Messasge in: "+ " " + message.value().toString());

消费者属性:

@Bean
  public ConsumerFactory<String,String> consumerFactory() {
    Map<String,Object> props = new HashMap<>();
    props.put(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
    props.put(
        ConsumerConfig.GROUP_ID_CONFIG,groupId);
    props.put(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    props.put(
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    return new DefaultKafkaConsumerFactory<>(props);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String,String>
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String,String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(10);
    factory.setConsumerFactory(consumerFactory());
    factory
        .getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
  }

主题在运行使用者代码之前进行描述:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2

Consumer group 'testMayank2' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testMayank2     baeldung        5          135             146             11              -               -               -
testMayank2     baeldung        9          140             145             5               -               -               -
testMayank2     baeldung        6          140             145             5               -               -               -
testMayank2     baeldung        1          144             148             4               -               -               -
testMayank2     baeldung        2          141             145             4               -               -               -
testMayank2     baeldung        0          144             148             4               -               -               -
testMayank2     baeldung        7          142             147             5               -               -               -
testMayank2     baeldung        3          142             147             5               -               -               -
testMayank2     baeldung        8          141             146             5               -               -               -
testMayank2     baeldung        4          142             146             4               -               -               -

运行使用者代码后的主题描述:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2

Warning: Consumer group 'testMayank2' is rebalancing.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testMayank2     baeldung        5          146             146             0               -               -               -
testMayank2     baeldung        9          140             145             5               -               -               -
testMayank2     baeldung        6          145             145             0               -               -               -
testMayank2     baeldung        1          144             148             4               -               -               -
testMayank2     baeldung        2          141             145             4               -               -               -
testMayank2     baeldung        0          148             148             0               -               -               -
testMayank2     baeldung        7          147             147             0               -               -               -
testMayank2     baeldung        3          147             147             0               -               -               -
testMayank2     baeldung        8          146             146             0               -               -               -
testMayank2     baeldung        4          146             146             0               -               -               -

Spring boot版本:2.2.6.RELEASE kafka版本:2.3.1

我在这里做错什么了吗?预计在异常关闭期间会提交偏移吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...