从kafka获取数据并发送给RSocket

问题描述

我正在尝试从 kafka 获取消息并使用 Spring 将其发送到 RSocket。在 Spring Java 和客户端使用 React 发布服务器端

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

    @Bean
    public Function<Integer,Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,RsocketConsumerProperties rsocketConsumerProperties) {
        RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
        return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
    }
}
@EnableBinding(Sink.class)
public class Listener {

    @Autowired
    private Function<Integer,Mono<Integer>> rsocketConsumer;


    @StreamListener(Sink.INPUT)
    public void fireAndForget(Integer val) {
        System.out.println(val);
        rsocketConsumer.apply(val).subscribe();
    }
}
@Controller
public class ServerController {

    @MessageMapping("data")
    public Mono<Integer> hello(Integer integer) {
        return Mono.just(integer);
    }

}

我在服务器端做错了什么,因为我的客户端已连接但无法获取新消息

  client.connect().subscribe({
    onComplete: socket => {
        socket.fireAndForget({
          data: { message: "hello from javascript!" },Metadata: null
        });
      },onError: error => {
        console.log("got error");
        console.error(error);
      },onSubscribe: cancel => {
        /* call cancel() to abort */
        console.log("subscribe!");
        console.log(cancel);
        // cancel.cancel();
      }
    });
   

解决方法

你这样做requester.route("input").data("Welcome to Rsocket").send();我们有这个:

   /**
     * Perform a {@link RSocket#fireAndForget fireAndForget} sending the
     * provided data and metadata.
     * @return a completion that indicates if the payload was sent
     * successfully or not. Note,however that is a one-way send and there
     * is no indication of whether or how the event was handled on the
     * remote end.
     */
    Mono<Void> send();

你看到 - Mono?这意味着它必须被订阅以启动响应式流处理。有关详细信息,请参阅 Reactor 项目:https://projectreactor.io/

另一方面,在你的情况下,不清楚什么是服务器,什么是客户端...... 你这样做:

    /**
     * Build an {@link RSocketRequester} with an
     * {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
     * the given URL. The requester can be used to make requests
     * concurrently. Requests are made over a shared connection that is also
     * re-established as needed when further requests are made.
     * @param uri the URL to connect to
     * @return the created {@code RSocketRequester}
     * @since 5.3
     */
    RSocketRequester websocket(URI uri);

而且我会说这意味着您展示的代码中的 client。服务器位于为 7000 协议打开 ws:// 端口的另一端。因此,请确保您正确理解和配置所有部件。例如,我不明白为什么您的 @RestController 类中需要 Listener...