问题描述
我正在使用 Kafka 和 Redis 构建一个反应式管道。现有服务使用来自 Apache Kafka 的事件,应用业务逻辑并最终更新 Redis 集。我正在重构服务以使用反应式 API。
这是使用响应式 API 的示例代码。
var options = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("topic_name"))
.addAssignListener(receiverPartitions -> {
System.out.println("Receiver partitions::" + receiverPartitions);
receiverPartitions.forEach(ReceiverPartition::seekToBeginning);
})
.addRevokeListener(receiverPartitions -> System.out.println("Revoke listeners::" + receiverPartitions));
KafkaReceiver.create(options)
.receive()
.log()
.concatMap(record -> processRecord(dateFormat,record))
.subscribe();
以及过程记录
private Mono<ReceiverRecord<Object,Object>> processRecord(SimpleDateFormat dateFormat,ReceiverRecord<Object,Object> record) {
ReceiverOffset offset = record.receiverOffset();
System.out.printf(Thread.currentThread() + "::Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",offset.topicPartition(),offset.offset(),dateFormat.format(new Date(record.timestamp())),record.key(),record.value());
Object value = record.value();
var publisher = reactiveRedistemplate
.opsForZSet()
.add("recent::users" + UUID.randomUUID(),value.toString(),(double) System.currentTimeMillis())
.log()
.flatMap(new Function<Boolean,Mono<ReceiverRecord<Object,Object>>>() {
@Override
public Mono<ReceiverRecord<Object,Object>> apply(Boolean aBoolean) {
System.out.println(Thread.currentThread() + "Got the response::" + aBoolean);
if (aBoolean ==null) {
return Mono.empty();
}
record.receiverOffset().ackNowledge();
System.out.println(Thread.currentThread()+ ":: Ack done");
return Mono.just(record);
}
});
System.out.println(Thread.currentThread() + "::Done..");
return publisher;
}
执行后的日志。
Thread[lettuce-nioEventLoop-4-1,5,main]Got the response::true
Thread[lettuce-nioEventLoop-4-1,main]:: Ack done
Thread[lettuce-nioEventLoop-4-1,main]::Received message: topic-partition=topic_name-101 offset=108 timestamp=17:10:54:509 IST 28 Jun 2021 key=null value={"log":{"log_date":1618255865000,"empId":1234}}
Thread[lettuce-nioEventLoop-4-1,main]::Done..
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369 : onSubscribe(MonoNext.NextSubscriber)
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369 : request(unbounded)
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2361 : onComplete()
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2362 : onNext(true)
Thread[lettuce-nioEventLoop-4-1,main]:: Ack done
我观察到 lettuce-nioEventLoop 线程甚至用于处理 kafka 事件和 lettuce 回调。我不明白这种行为。有人可以照亮吗?。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)