春季kafka idlebetweenpolls总是触发分区重新平衡

问题描述

我正在尝试使用上述here中提到的两次轮询之间的空闲时间来降低消耗率,我也使用max.poll.interval.ms将两次轮询之间的空闲时间加倍,但是它总是会触发分区重新平衡,知道是什么问题吗? [编辑] 我有5个主机,并且我将并发级别设置为1 [编辑2] 我将轮询之间的空闲时间设置为5分钟,将max.poll.interval.ms设置为10分钟,我还注意到此日志“由于空闲时间为540012毫秒,即将关闭105的空闲连接”。 我将两次轮询之间的空闲时间减少到10秒,问题消失了,知道为什么吗?

 private ConsumerFactory<String,GenericRecord> dlqConsumerFactory() {
        Map<String,Object> configurationProperties = commonConfigs();

        DlqConfiguration dlqConfiguration = kafkaProperties.getConsumer().getDlq();

        final Integer idleBetweenPollInterval = dlqConfiguration.getIdleBetweenPollInterval()
                .orElse(DLQ_POLL_INTERVAL);

        final Integer maxPollInterval = idleBetweenPollInterval * 2; // two times the idleBetweenPoll,to prevent re-balancing
        logger.info("Setting max poll interval to {} for DLQ",maxPollInterval);

        overrideIfRequired(DQL_CONSUMER_CONFIGURATION,configurationProperties,ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollInterval);

        dlqConfiguration.getMaxPollRecords().ifPresent(maxPollRecords ->
                overrideIfRequired(DQL_CONSUMER_CONFIGURATION,ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords)
        );


        return new DefaultKafkaConsumerFactory<>(configurationProperties);
    }

解决方法

<time to process last polled records> + <idle between polls>必须小于max.poll.interval.ms

编辑

容器中有逻辑以确保我们永远不会超过最大轮询间隔:

idleBetweenPolls = Math.min(idleBetweenPolls,this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll)
                - 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance

我无法用这个来重现问题...

@SpringBootApplication
public class So63411124Application {

    public static void main(String[] args) {
        SpringApplication.run(So63411124Application.class,args);
    }

    @KafkaListener(id = "so63411124",topics = "so63411124")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory<?,?> factory,KafkaTemplate<String,String> template) {

        factory.getContainerProperties().setIdleBetweenPolls(300000L);
        return args -> {
            while (true) {
                template.send("so63411124","foo");
                Thread.sleep(295000);
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63411124").partitions(1).replicas(1).build();
    }

}
logging.level.org.springframework.kafka=debug
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=600000

如果您可以提供一个这样的小例子来展示您描述的行为,那么我将看看有什么问题。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...