问题描述
我正在使用具有 spring webflux SSE 流的 reactor-kafka 库从 kafka 主题中使用数据。当来自主题的所有消息都被消耗时,我需要返回一个特殊的 ServerSentEvent,即最大主题偏移量等于从 0 偏移量订阅时消耗的当前偏移量。以便客户端知道 kafka 主题中没有更多消息。
是否可以使用 Web Flux 来实现这样的目标?即,如果我说在从任何有限元素列表中每消耗 100 个元素并通过 SSE 流作为 ServerSentEvent 发送后,该 SSE 流应该再获得一个事件作为 SeverSentEvent 并带有注释“consumed”。
解决方法
我可能错了,但是,这对我来说没有任何意义。
Kafka 是事件流(始终开启)。
我的意思是,kafka 中没有最后一条消息的概念。
你可以在topic消费者端写一些代码,所以,当某个时间没有事件到达时,触发一个动作。
但是,这并不准确,而且确实很有意义。
我不知道这是不是你想要的。
@GetMapping(path = "/sse/endpoint",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> sse() {
return new KafkaConsumer().receive();
}
public class KafkaConsumer {
private final Sinks.Many<Event> sseEventSender = Sinks.many().multicast()
.onBackpressureBuffer();
public Flux<Event> receive() {
return sseEventSender.asFlux();
}
public void consume() {
if(...) {
sseEventSender.tryEmitNext(event);
}
}
}