为什么 RSocket 连接重试每次都使用多个不同的线程

问题描述

我有以下程序连接到在 localhost:7999 上运行的 Spring boot rsocket 服务器。 我已经配置了连接器 Retry.fixedDelay(Integer.MAX_VALUE,Duration.ofSeconds(5)) 如您所见,RSocketRequester 是 Mono,因此它应该保持单个连接。 当连接失败并开始重试时,我看到每次重试都是从不同的线程进行的,即如下 parallel-1---parallel-8。我可以知道这背后的原因吗?

12:08:24.463550|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #1 (1 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:30.470593|parallel-2|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #2 (2 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:36.475666|parallel-3|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #3 (3 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:42.494801|parallel-4|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #4 (4 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:48.499084|parallel-5|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #5 (5 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:54.503385|parallel-6|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #6 (6 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:00.509830|parallel-7|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #7 (7 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:06.545815|parallel-8|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #8 (8 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:12.553582|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #9 (9 in a row),last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}

我的程序如下:

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(e -> e.add(new Jackson2CborEncoder()))
    .decoders(e -> e.add(new Jackson2CborDecoder()))
    .build();

Mono<RSocketRequester> requester = Mono.just(RSocketRequester.builder()
     .rsocketConnector(connector -> {
           connector.reconnect(
                     Retry.fixedDelay(Integer.MAX_VALUE,Duration.ofSeconds(5))
                     .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}",e)))
             .acceptor(RSocketMessageHandler.responder(strategies,this))
             .payloadDecoder(PayloadDecoder.ZERO_copY);
            })
      .dataMimeType(MediaType.APPLICATION_CBOR)
      .setupRoute("test")
      .setupData("test-123")
      .rsocketStrategies(strategies)
      .tcp("localhost",7999));

解决方法

这篇文章(Flight of the Flux 3)很好地介绍了 Spring Reactor 线程模型。 Reactor 是在 rsocket-java 中提供 Rx 功能实现的基础库。

关键句是

Schedulers.parallel() 适用于 CPU 密集型但寿命短的任务。 它可以并行执行 N 个这样的任务(默认 N == 数量 CPU)

另请阅读https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html

如果保证所有操作都在单个线程上进行,则很可能会导致嘈杂的延迟,因为最初碰巧获得相同线程的两个不同客户端将在程序的整个生命周期中竞争该线程。因此,它比在有限的线程池之间均匀分布的一般工作要好。

,

感谢@Yuri @bruto @OlegDokuka 以及您的建议和答案。我已将我的程序更改如下以强制重试在单线程上运行。

connector.reconnect(
        Retry.fixedDelay(Integer.MAX_VALUE,Duration.ofSeconds(5))
        .scheduler(Schedulers.single()) // <---- This enforces retry to run on a single thread
        .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}",e)))
        .acceptor(RSocketMessageHandler.responder(strategies,this))
        .payloadDecoder(PayloadDecoder.ZERO_COPY);
      })