使用 Reactor Kafka 和 Reactive Redis 构建反应式管道

问题描述

我正在使用 Kafka 和 Redis 构建一个反应式管道。现有服务使用来自 Apache Kafka 的事件,应用业务逻辑并最终更新 Redis 集。我正在重构服务以使用反应式 API。

这是使用响应式 API 的示例代码

var options = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("topic_name"))
                .addAssignListener(receiverPartitions -> {
                    System.out.println("Receiver partitions::" + receiverPartitions);
                    receiverPartitions.forEach(ReceiverPartition::seekToBeginning);
                })
                .addRevokeListener(receiverPartitions -> System.out.println("Revoke listeners::" + receiverPartitions));

KafkaReceiver.create(options)
                        .receive()
                        .log()
                        .concatMap(record -> processRecord(dateFormat,record))
                        .subscribe();

以及过程记录

private Mono<ReceiverRecord<Object,Object>> processRecord(SimpleDateFormat dateFormat,ReceiverRecord<Object,Object> record) {
        ReceiverOffset offset = record.receiverOffset();
        System.out.printf(Thread.currentThread() + "::Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",offset.topicPartition(),offset.offset(),dateFormat.format(new Date(record.timestamp())),record.key(),record.value());
        Object value = record.value();
        var publisher = reactiveRedistemplate
                .opsForZSet()
                .add("recent::users" + UUID.randomUUID(),value.toString(),(double) System.currentTimeMillis())
                .log()
                .flatMap(new Function<Boolean,Mono<ReceiverRecord<Object,Object>>>() {
                    @Override
                    public Mono<ReceiverRecord<Object,Object>> apply(Boolean aBoolean) {
                        System.out.println(Thread.currentThread() + "Got the response::"  + aBoolean);
                        if (aBoolean ==null) {
                            return Mono.empty();
                        }
                        record.receiverOffset().ackNowledge();
                        System.out.println(Thread.currentThread()+ ":: Ack done");
                        return Mono.just(record);
                    }
                });
        System.out.println(Thread.currentThread() + "::Done..");
        return publisher;
    }

执行后的日志。

Thread[lettuce-nioEventLoop-4-1,5,main]Got the response::true
Thread[lettuce-nioEventLoop-4-1,main]:: Ack done
Thread[lettuce-nioEventLoop-4-1,main]::Received message: topic-partition=topic_name-101 offset=108 timestamp=17:10:54:509 IST 28 Jun 2021 key=null value={"log":{"log_date":1618255865000,"empId":1234}}
Thread[lettuce-nioEventLoop-4-1,main]::Done..
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369                   : onSubscribe(MonoNext.NextSubscriber)
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369                   : request(unbounded)
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2361                   : onComplete()
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2362                   : onNext(true)
Thread[lettuce-nioEventLoop-4-1,main]:: Ack done

我观察到 lettuce-nioEventLoop 线程甚至用于处理 kafka 事件和 lettuce 回调。我不明白这种行为。有人可以照亮吗?。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...