项目反应堆-长期运行的反应式卡夫卡消费者

问题描述

有人有一个长期运行的反应式卡夫卡消费者的例子吗? https://github.com/reactor/reactor-kafka.git中的SampleConsumer会在订阅退出,这是预期的。但是我需要不断地从主题中接收消息,如果出现故障(尤其是连接问题),我应该重新连接到该消息。

我确实通过https://github.com/CollaborationInEncapsulation/s1p-reactor-netty-kafka-twitter.git作为示例的帮助,但不是很成功。帮助表示赞赏

解决方法

将SampleConsumer转换为LongRunningSampleConsumer很容易,更改是添加一个blockLast来无限期地阻塞。我知道阻塞并不是实现目标的最佳方法,我很乐意替代解决方案。

当前代码在下面的git中。

https://github.com/schengalath/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/LongRunningSampleConsumer.java