问题描述
我正在使用 spring-kafka 运行 kafka 消费者服务。我已将 enable.auto.commit 设置为 false 并将 AckMode 设置为 MANUAL_IMMEDIATE。还使用并发等于 2 的 ConcurrentKafkaListenerContainerFactory。(我主题中的分区数)
现在我正在收听我的主题,我正在调试模式下运行我的应用程序,一旦我在我的 KafkaListener 方法中收到第一条消息,我就会在调用 AckNowledge.ackNowledge() 之前强行停止我的应用程序。
KafkaConfig class code :
/**
* create consumer factory for core user event consumer.
*/
@Bean
public ConsumerFactory<String,String> userConsumerFactory() {
Map<String,Object> props = consumerConfigs();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,eventStreamingProperties.getEventhubUsersEventBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
props.put(ConsumerConfig.GROUP_ID_CONfig,"$Default");
if (eventStreamingProperties.isEventhubUsersEventSecurityEnabled()) {
props.put(
CommonClientConfigs.Security_PROTOCOL_CONfig,"SASL_SSL");
props.put(
"sasl.mechanism","PLAIN");
props.put(
"sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required "
+ "username=\"$ConnectionString\" password=\"" + connectionStringForUserEvents + "\";");
}
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* create kafka listener container factory for core user event consumer.
*/
@Bean
public KafkaListenerContainerFactory kafkaUserListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
factory.setAutoStartup(true);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler((record,exception) -> {
},new FixedBackOff(retryInterval,retryMaxAttempts)));
return factory;
}
Listner Code :
@KafkaListener(
topics = "${events.user.topic}",concurrency = "${events.user.concurrency}",containerFactory = "kafkaUserListenerContainerFactory")
public void processEvent(ConsumerRecord<String,String> record,AckNowledgment ackNowledgment,@Header("kafka_offset") int offSet) {
log.info(String.format("In UserEventListener: processing the Event for user"
+ "migration Key : %s,Partition : %s,Topic : %s"
+ ",Headers : %s",record.key(),record.partition(),record.topic(),record.headers().toString()));
coreUserEventConsumer.validateAndConsumeEvent(record);
ackNowledgment.ackNowledge();
}
在调用ackNowledgement.ackNowledge() 方法之前,我停止了在调试模式下运行的应用程序。当我重新启动我的应用程序时,它不会读取消息。所以我应该在这里做什么请帮忙。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)