Rsocket在尝试重新连接期间引发异常

问题描述

我在基本的Spring Boot 2.3.0上创建了应用程序服务器和客户端。该节点之间的通信正确,但是重新启动服务器后重新连接会出现问题。

  • 服务器:

application.properties:`

spring.rsocket.server.port=7010
spring.rsocket.server.transport=websocket

Main.class:

    @Bean
    public RSocketServerCustomizer rSocketResume() {
        return rSocketServer -> rSocketServer.resume( new Resume()
                .sessionDuration(Duration.ofDays(10))).payloadDecoder(PayloadDecoder.ZERO_copY);
    }
  • 客户:
  RSocketConnector.create().acceptor(RSocketMessageHandler.responder(rSocketStrategies,receiverController))
                .resume(new Resume()
                        .sessionDuration(Duration.ofDays(10))
                        .retry(Retry.fixedDelay(Long.MAX_VALUE,Duration.ofSeconds(2))
                                .dobeforeRetry(s -> System.out.println("disconnected. I'am trying to reconnect")
                                )))
                .payloadDecoder(PayloadDecoder.ZERO_copY)
                .acceptor(RSocketMessageHandler.responder(rSocketStrategies,receiverController))
                .connect(() -> WebsocketClientTransport.create(apiIp,apiPort).connect()).block();

在客户端关闭服务器后,我看到

"disconnected. I'am trying to reconnect"
"disconnected. I'am trying to reconnect"
"disconnected. I'am trying to reconnect"
...

服务器再次启动时,客户端引发异常:

2020-10-27 21:04:06.519 ERROR 174236 --- [or-http-epoll-3] r.n.channel.ChannelOperationsHandler     : [id: 0x8f2443a2,L:/127.0.0.1:59484 ! R:127.0.0.1/127.0.0.1:7000] Error was received while reading the incoming data. The connection will be closed.

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: SETUP or LEASE frame must be received before any others.
Caused by: java.lang.IllegalStateException: SETUP or LEASE frame must be received before any others.
    at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$0(ClientServerInputMultiplexer.java:109) ~[rsocket-core-1.0.2.jar:na]
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:180) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:366) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:667) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:156) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:296) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar:4.1.51.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar:4.1.51.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar:4.1.51.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2020-10-27 21:04:06.520  WARN 174236 --- [or-http-epoll-3] reactor.netty.channel.FluxReceive        : [id: 0x8f2443a2,L:/127.0.0.1:59484 ! R:127.0.0.1/127.0.0.1:7000] An exception has been observed post termination,use DEBUG level to see the full stack: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: SETUP or LEASE frame must be received before any others.
disconnected. I'am trying to reconnect
2020-10-27 21:04:06.529 ERROR 174236 --- [or-http-epoll-2] r.n.channel.ChannelOperationsHandler     : [id: 0x6b644dde,L:/127.0.0.1:59482 ! R:127.0.0.1/127.0.0.1:7000] Error was received while reading the incoming data. The connection will be closed.

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: SETUP or LEASE frame must be received before any others.
Caused by: java.lang.IllegalStateException: SETUP or LEASE frame must be received before any others.
    at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$0(ClientServerInputMultiplexer.java:109) ~[rsocket-core-1.0.2.jar:na]
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:180) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:366) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:667) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:156) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:296) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar:4.1.51.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar:4.1.51.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar:4.1.51.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

有人有类似的问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)