使用不同的线程使用 reactor-kafka 从 Kafka 中的消费者组读取

问题描述

我需要从具有数百万数据的 Kafka 主题中进行消费。从主题中读取后,我需要将其转换并写入另一个主题。我能够使用来自主题的消息,通过多个线程处理数据并写入另一个主题。 我按照这里的例子https://projectreactor.io/docs/kafka/1.3.5-SNAPSHOT/reference/index.html#concurrent-ordered

这是我的代码

public Flux<?> flux() {
            KafkaSender<Integer,Person> sender = sender(senderOptions());
            return KafkaReceiver.create(receiverOptions(Collections.singleton(sourcetopic)))
                                .receive()
                                .map(m -> SenderRecord.create(transform(m.value()),m.receiverOffset()))
                                .as(sender::send)
                                .doOnNext(m -> m.correlationMetadata().ackNowledge())
                                .doOnCancel(() -> close());
        }
            

我有多个消费者可供读取,并且由于数据量的原因,我正在考虑添加不同的阅读器线程来读取主题。然而,reactor-kafka documentation 提到 KafkaReceiver 不是线程安全的,因为底层的 KafkaConsumer 不能被多个线程同时访问。

我正在寻找关于同时阅读某个主题的建议。

解决方法

所以基本上你正在寻找的消费者组,你可以运行的最大并行消费受到你的主题所拥有的分区数量的限制。

Kafka Consumer Group 机制允许您将消费一个主题的工作分开到属于同一组的不同“读者”,工作将被划分为该组中的每个消费者将单独负责一个分区(1或更多,基于组中的消费者数量,以及主题的分区数量)