从 Kafka 获取消息,发送到 Rsocket 并从 React 客户端接收它

问题描述

我正在尝试使用 Spring 云流将数据从 kafka 发送到 Rsocket,然后在 React 上表示数据

这是我的配置。

@Configuration
public class RsocketConsumerConfiguration {
    
    @Bean
    public Sinks.Many<Data> sender(){
        return Sinks.many().multicast().directBestEffort();
    }
    

}

@控制器 公共类 ServerController {

@Autowired
private Sinks.Many<Data> integer;

@MessageMapping("integer")
public Flux<Data> integer() {
    return  integer.asFlux();
}
@EnableBinding(IClientProcessor.class)
public class Listener {

    @Autowired
    private Sinks.Many<Data> integer;

    @StreamListener(IClientProcessor.INTEGER)
    public void integer(Data val) {
        System.out.println(val);
        integer.tryEmitNext(val);
    }

}

   let  client = new RSocketClient({
    transport: new RSocketWebSocketClient(
        {
            url: 'ws://localhost:7000/ws',wsCreator: (url) => new WebSocket(url),debug: true,},BufferEncoders,),setup: {
        dataMimeType: "application/json",MetadataMimeType: MESSAGE_RSOCKET_COMPOSITE_MetaDATA.string,keepAlive: 5000,lifetime: 60000,});

  client
            .then(rsocket => {
                console.log("Connected to rsocket");
                rsocket.requestStream({
                    Metadata: Buffer.from(encodeCompositeMetadata([
                        [MESSAGE_RSOCKET_ROUTING,encodeRoute("integer")],])),})
                    .subscribe({
                        onSubscribe: s => {
                            s.request(2147483647)
                        },onNext: (p) => {
                            let newData = {
                                time: new Date(JSON.parse(p.data).time).getUTCSeconds(),integer: JSON.parse(p.data).integer
                            }
                           newData.integer >100?setInteger(currentData => [newData,...currentData]):setInt(currentData => [newData,...currentData])
                           console.log(newData)
                        },onError: (e) => console.error(e),onComplete: () => console.log("Done")
                    });

spring.cloud.stream.bindings.integer.destination=整数 无法在反应应用程序中看到它。请指教。我做错了什么?

解决方法

鉴于数据似乎直接从 Kafka(通过 Spring)传输到客户端,也许 stream Kafka messages via an internet-messaging broker to Internet-facing clients over WebSockets 更有意义。

披露:我不是那篇文章的作者,但在作者工作的那家公司工作。我们经常看到这个用例,所以希望这种方法可能有用。