问题描述
我想在我的反应式 kafka 消费者中精确控制消息消耗。我想从单个主题的每个分区同时读取特定数量的消息。我在这里阅读了 reactor kafka 项目的文档:
https://projectreactor.io/docs/kafka/release/reference/#concurrent-ordered
现在我有这样的代码,不幸的是它不起作用。它只读取一个分区的 10 条消息。我想读取每个分区的 10 条消息,然后在该点结束消息流。
receiver.receive()
.groupBy { it.receiverOffset().topicPartition() }
.flatMap { topicPartitionFlux ->
topicPartitionFlux.publishOn(Schedulers.elastic())
.map { Message(it.key(),it.timestamp(),it.partition(),it.offset(),it.value()) }
.take(10L)
}.subscribe{ println(it)}
对此的任何提示,非常感谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)