Kafka 反应器 - 如何禁用自动启动的 KAFKA 消费者?

问题描述

下面是我的 KAFKA 消费者

@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",matchIfMissing = false)
public KafkaReceiver<String,Object> kafkaInboundReceiver() {
    ReceiverOptions<String,Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
    receiverOptions.schedulersupplier(() -> Schedulers
            .fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService",ExecutorService.class)));
    receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
    return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
            .subscription(Collections.singleton(
                    kafkaProperties.getKafka()
                            .getCore().getInbound().getConfluent()
                            .getTopicName()))
            .commitInterval(Duration.ZERO).commitBatchSize(0));
}

我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 使用者。

我知道,在春季卡夫卡我们可以做这样的事情

factory.setAutoStartup(start);

但是,我不确定如何在 Kafka 反应器中实现(控制自动启动/停止行为)。我想要像下面这样的东西

引入一个属性来处理自动启动/停止行为

@Value("${consumer.autostart:true}")
private boolean start;

使用上述属性,我应该能够在 Kafka reactor 中设置 KAFKA Auto-Start 标志,就像这样

return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
        .subscription(Collections.singleton(
                kafkaProperties.getKafka()
                        .getCore().getInbound().getConfluent()
                        .getTopicName()))
        .commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);

注意:.setAutoStart(start);

这在 Kafka reactor 中是否可行,如果可以,我该怎么做?

更新:

protected void inboundEventHubListener(String topicName,List<String> allowedValues) {
    Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
    kafkaEventHubInboundReceiver
            .receive()
            .publishOn(scheduler)
            .groupBy(receiverRecord -> {
                try {
                    return receiverRecord.receiverOffset().topicPartition();
                } catch (Throwable throwable) {
                    log.error("exception in groupby",throwable);
                    return Flux.empty();
                }
            }).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
            .map(record -> {
                processMessage(record,topicName,allowedValues).block(
                        Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
                return record;
            }).concatMap(message -> {
                log.info("Received message after processing offset: {} partition: {} ",message.offset(),message.partition());
                return message.receiverOffset()
                        .commit()
                        .onErrorContinue((t,o) -> log.error(
                                String.format("exception raised while commit offset %s",o),t)
                        );
            })).onErrorContinue((t,o) -> {
        try {
            if (null != o) {
                ReceiverRecord<String,Object> record = (ReceiverRecord<String,Object>) o;
                ReceiverOffset offset = record.receiverOffset();
                log.debug("Failed to process message: {} partition: {} and message: {} ",offset.offset(),record.partition(),record.value());
            }
            log.error(String.format("exception raised while processing message %s",t);
        } catch (Throwable inner) {
            log.error("encountered error in onErrorContinue",inner);
        }
    }).subscribeOn(scheduler).subscribe();

我可以做这样的事情吗?

kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}

解决方法

reactor-kafka 没有“自动启动”的概念;您完全可以控制。

在您订阅从 Flux 返回的 receiver.receive() 之前,消费者不会“启动”。

只需延迟 flux.subscribe(),直到您准备好使用数据。