问题描述
我需要轮询 kafka 并批量处理事件。在 Reactor kafka 中,由于它是一个流媒体 API,我将事件作为流获取。有没有办法组合并获得固定的最大事件大小。
这就是我目前正在做的事情。
final Flux<Flux<ConsumerRecord<String,String>>> receive = KafkaReceiver.create(eventReceiverOptions)
.receiveAutoAck();
receive
.concatMap(r -> r)
.doOnEach(listSignal -> log.info("got one message"))
.map(consumerRecords -> consumerRecords.value())
.collectList()
.flatMap(strings -> {
log.info("Read messages of size {}",strings.size());
return processBulkMessage(strings)
.doOnSuccess(aBoolean -> log.info("Processed records"))
.thenReturn(strings);
}).subscribe();
但是代码只是在 collectList 之后挂起,永远不会去到最后一个 flatMap。
提前致谢。
解决方法
您只需对普通 .concatMap(r -> r)
进行“扁平化”,因此您完全消除了最初由该 receiveAutoAck()
构建的批处理。要让您的 processBulkMessage()
处理一系列列表,请考虑将所有批处理逻辑移到该 concatMap()
中:
.concatMap(batch -> batch
.doOnEach(listSignal -> log.info("got one message"))
.map(ConsumerRecord::value)
.collectList())
.flatMap(strings -> {