问题描述
我正在尝试使用上述here中提到的两次轮询之间的空闲时间来降低消耗率,我也使用max.poll.interval.ms将两次轮询之间的空闲时间加倍,但是它总是会触发分区重新平衡,知道是什么问题吗? [编辑] 我有5个主机,并且我将并发级别设置为1 [编辑2] 我将轮询之间的空闲时间设置为5分钟,将max.poll.interval.ms设置为10分钟,我还注意到此日志“由于空闲时间为540012毫秒,即将关闭105的空闲连接”。 我将两次轮询之间的空闲时间减少到10秒,问题消失了,知道为什么吗?
private ConsumerFactory<String,GenericRecord> dlqConsumerFactory() {
Map<String,Object> configurationProperties = commonConfigs();
DlqConfiguration dlqConfiguration = kafkaProperties.getConsumer().getDlq();
final Integer idleBetweenPollInterval = dlqConfiguration.getIdleBetweenPollInterval()
.orElse(DLQ_POLL_INTERVAL);
final Integer maxPollInterval = idleBetweenPollInterval * 2; // two times the idleBetweenPoll,to prevent re-balancing
logger.info("Setting max poll interval to {} for DLQ",maxPollInterval);
overrideIfRequired(DQL_CONSUMER_CONFIGURATION,configurationProperties,ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollInterval);
dlqConfiguration.getMaxPollRecords().ifPresent(maxPollRecords ->
overrideIfRequired(DQL_CONSUMER_CONFIGURATION,ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords)
);
return new DefaultKafkaConsumerFactory<>(configurationProperties);
}
解决方法
<time to process last polled records> + <idle between polls>
必须小于max.poll.interval.ms
。
编辑
容器中有逻辑以确保我们永远不会超过最大轮询间隔:
idleBetweenPolls = Math.min(idleBetweenPolls,this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll)
- 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance
我无法用这个来重现问题...
@SpringBootApplication
public class So63411124Application {
public static void main(String[] args) {
SpringApplication.run(So63411124Application.class,args);
}
@KafkaListener(id = "so63411124",topics = "so63411124")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory<?,?> factory,KafkaTemplate<String,String> template) {
factory.getContainerProperties().setIdleBetweenPolls(300000L);
return args -> {
while (true) {
template.send("so63411124","foo");
Thread.sleep(295000);
}
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63411124").partitions(1).replicas(1).build();
}
}
logging.level.org.springframework.kafka=debug
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=600000
如果您可以提供一个这样的小例子来展示您描述的行为,那么我将看看有什么问题。