发送从反应堆 kafka 主题消耗的前 100 条消息后,如何有条件地发布额外的 SSE Web 流量事件

问题描述

我正在使用具有 spring webflux SSE 流的 reactor-kafka 库从 kafka 主题中使用数据。当来自主题的所有消息都被消耗时,我需要返回一个特殊的 ServerSentEvent,即最大主题偏移量等于从 0 偏移量订阅时消耗的当前偏移量。以便客户端知道 kafka 主题中没有更多消息。

是否可以使用 Web Flux 来实现这样的目标?即,如果我说在从任何有限元素列表中每消耗 100 个元素并通过 SSE 流作为 ServerSentEvent 发送后,该 SSE 流应该再获得一个事件作为 SeverSentEvent 并带有注释“consumed”。

解决方法

我可能错了,但是,这对我来说没有任何意义。
Kafka 是事件流(始终开启)。
我的意思是,kafka 中没有最后一条消息的概念。

你可以在topic消费者端写一些代码,所以,当某个时间没有事件到达时,触发一个动作。
但是,这并不准确,而且确实很有意义。

,

我不知道这是不是你想要的。

    @GetMapping(path = "/sse/endpoint",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> sse() {
        return new KafkaConsumer().receive();
    }

    public class KafkaConsumer {

        private final Sinks.Many<Event> sseEventSender = Sinks.many().multicast()
                .onBackpressureBuffer();

        public Flux<Event> receive() {
            return sseEventSender.asFlux();
        }

        public void consume() {
            if(...) {
                sseEventSender.tryEmitNext(event);
            }
        }
    }