如何在Spring Boot中延迟加载Kafka Consumer?

问题描述

我想通过命令行参数提供组ID,但尝试此操作时出现以下错误

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config,container properties,or @KafkaListener annotation; a group.id is required when group management is used.

这意味着在加载kafkalistener时,它需要group_id。如果我在consumerConfig文件中提供了groupId,则它可以正常工作。

有什么办法可以让我通过命令行给组ID以及kafka侦听器的延迟加载,以便我在程序启动时不需要。

我的ConsumerConfig

@Configuration
class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Autowired
    private ArgumentModel argumentModel;

    private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Bean
    public Map<String,Object> consumerConfigs() {
        Map<String,Object> props = new HashMap<>();
        logger.info("bootstrapServers : {}",bootstrapServers);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONfig,argumentModel.getKafkaGroupId());
        return props;
    }

    @Bean
    public ConsumerFactory<String,String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

解决方法

@KafkaListener(... groupId = "${group.id}")

然后在命令行中传递-Dgroup.id=myGroup