在 Reactor Kafka 中使用基于分区的排序进行并发处理

问题描述

我正在开发一个示例应用程序,该应用程序将从 Kafka 主题的不同分区读取,并发处理基于分区排序的记录并将记录写入另一个主题的不同分区。 这是我写的示例代码

    public class MetricsTransposer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    static abstract class SetKafkaProperties {
        final String SOURCE_TOPIC;
        final String DESTINATION_TOPIC;
        final Map<String,Object> consumerProps;
        final Map<String,Object> producerProps;

        SetKafkaProperties(Map<String,Object> consumerPropsOverride,Map<String,Object> producerPropsOverride,String bootstrapServers,String sourcetopic,String destTopic) {
            SOURCE_TOPIC = sourcetopic;
            DESTINATION_TOPIC = destTopic;

            consumerProps = new HashMap<String,Object>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONfig,"reactive-group-" + System.currentTimeMillis());
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,"false");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"latest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,IntegerDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONfig,"0"); 
            if(consumerPropsOverride != null) {
                consumerProps.putAll(consumerPropsOverride);
            }

            producerProps = new HashMap<String,Object>();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
            producerProps.put(ProducerConfig.LINGER_MS_CONfig,"0");
            producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONfig,String.valueOf(Long.MAX_VALUE));
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,IntegerDeserializer.class);
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringDeserializer.class);
            if(producerPropsOverride != null) {
                producerProps.putAll(producerPropsOverride);
            }
        }
    }

    static class ReactiveTranspose extends SetKafkaProperties {

        SenderOptions<Integer,String> senderOptions =
            SenderOptions.<Integer,String>create(producerProps)
                .maxInFlight(1024);

        KafkaSender<Integer,String> sender = KafkaSender.create(senderOptions);

        ReceiverOptions<Integer,String> receiverOptions =
            ReceiverOptions.<Integer,String>create(consumerProps)
                .subscription(Collections.singleton(SOURCE_TOPIC));


        ReactiveTranspose(Map<String,String destTopic) {
            super(consumerPropsOverride,producerPropsOverride,bootstrapServers,sourcetopic,destTopic);
        }

        public disposable ReadProcessWriteRecords() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60,60,"writerThreads");
           return KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext( r -> System.out.printf("Record received: {0}",r.value()))
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                    partitionFlux.publishOn(scheduler)
                        .map(r -> processRecord(partitionFlux.key(),r))
                        .sample(Duration.ofMillis(5000))
                        .concatMap(offset -> offset.commit()))
                .subscribe();
        }

        private ReceiverOffset processRecord(TopicPartition topicPartition,ReceiverRecord<Integer,String> message) {
            System.out.printf("Processing record {} from partition {} in thread{}",message.value(),topicPartition,Thread.currentThread().getName());
            return message.receiverOffset();
        }
    }

    public static void RunReactiveTranformProcess(String sourcetopic,String destinationTopic) {
        ReactiveTranspose transpose = new ReactiveTranspose(null,null,BOOTSTRAP_SERVERS,destinationTopic);
        transpose.ReadProcessWriteRecords();
    }

    public static void main(String[] args) throws Exception {
        String sourcetopic = "metrics";
        String destinationTopic = "cleanmetrics";

        RunReactiveTranformProcess(sourcetopic,destinationTopic);

    }
}

当我运行应用程序时,我没有在控制台中看到打印语句。我确实有要在主题中使用的数据。所以我想知道代码是否完全连接主题。我正在寻求帮助,以确定如何检查它是否连接到主题并阅读消息或此处可能存在什么问题。

我是 Java、响应式编程和 Kafka 的新手。这是一个自学项目,我很可能遗漏了一些简单而明显的东西。

更多信息: 这是我的日志的快照。我有一个名为 metrics 的主题,有 3 个分区

enter image description here

更新:我没有看到我的打印语句,因为我的主题中有数据但 auto.offset.reset 设置为最新。将其更改为最早消耗现有数据。

解决方法

您的问题在这里:

public void ReadProcessWriteRecords() {
    Scheduler scheduler = Schedulers.newBoundedElastic(60,60,"writerThreads");

    // Here you are ignoring the return
    // Nothing happens until you subscribe
    // So this is merly a statement not a execution.
    KafkaReceiver.create(receiverOptions)
                 .receive()
                 .doOnNext( r -> System.out.printf("Record received: {0}",r.value()))
                 .groupBy(m -> m.receiverOffset().topicPartition())
                 .flatMap(partitionFlux ->
                     partitionFlux.publishOn(scheduler)
                         .map(r -> processRecord(partitionFlux.key(),r))
                         .sample(Duration.ofMillis(5000))
                         .concatMap(offset -> offset.commit()));
}

反应式文档在 nothing happens until you subscribe 的入门部分中对此进行了介绍。在上面的代码中,您正在创建一个响应式流,但没有人订阅它。

由于您的应用程序是流的使用者,您应该在某处添加一个 subscribe 语句。

我个人不会返回 void(您通常会在反应式编程中尝试避免 void 函数,因为它们通常会导致副作用并且难以测试),我会将 producer 一直返回到主函数以便代码可以进行单元测试。

这样生成的主函数就会看起来像这样。

public static void main(String[] args) throws Exception {
    String sourceTopic = "metrics";
    String destinationTopic = "cleanmetrics";

    RunReactiveTranformProcess(sourceTopic,destinationTopic).subscribe();

}