如何在 Spring Boot RSocket Reactive 中处理入​​站流取消

问题描述

目标

我希望在我的 Spring Boot 应用程序中有一个 RSocket 通道端点,我可以在其中处理入站、客户端驱动的流的取消以进行一些服务器端清理。

设置

相关依赖:

  • Spring Boot 2.4.2
  • 科特林 1.4.21
  • Kotlinx 协程 1.4.2
  • RSocket 核心 1.1.0

我已经尝试使用 Kotlin 协程流程和 Reactor Flux(en?)来实现我的目标。下面的两个客户端/服务器对应该做同样的事情:建立一个 RSocket 通道,从客户端发送 2 个“ping”有效载荷,服务器用“pong”有效载荷响应每个,然后客户端关闭连接。

流服务器端:

    @MessageMapping("testFlow")
    fun testPingFlow(input: Flow<String>): Flow<String> {
        val cs = Coroutinescope(EmptyCoroutineContext)
        val output = MutableSharedFlow<String>(10)

        cs.launch {
            try {
                input
                    .catch { e ->
                        logger.error("Rsocket server input error",e)
                    }
                    .onCompletion { exception ->
                        logger.debug("Rsocket server input completed")
                        if (exception != null) {
                            logger.error("Exception received while processing Rsocket server input flow",exception)
                        }
                    }
                    // normal .collect complains about being internal-only
                    .collectIndexed { _,message ->
                        logger.debug("Rsocket server input received $message")
                        output.emit("pong ${System.currentTimeMillis()}")
                    }
            } catch (e: Throwable) {
                logger.error("Rsocket server input connection exception caught",e)
            }
        }
        return output
    }

流客户端测试:

    @Test
    fun testPingFlow() {
        val outToServer = MutableSharedFlow<String>(10)

        runBlocking {
            val socketFlow = rSocketRequester
                .route("testFlow")
                .data(outToServer.asFlux())
                .retrieveFlow<String>()
                .take(2)

            outToServer.emit("Ping ${System.currentTimeMillis()}")
            outToServer.emit("Ping ${System.currentTimeMillis()}")

            socketFlow
                .onCompletion { exception ->
                    logger.debug("Rsocket client output completed")
                    if (exception != null) {
                        logger.error("Exception received while processing Rsocket client output flow",exception)
                    }
                }
                .collect { message ->
                    logger.debug("Received pong from server $message")
                }
        }
    }

Flux 服务器端:

    @MessageMapping("testFlux")
    fun testPingFlux(input: Flux<String>): Flux<String> {
        val output = Sinks.many().unicast().onBackpressureBuffer<String>()
        try {
            input
                .doOnNext { message ->
                    logger.debug("Rsocket server input message received $message")
                }
                .doOnError { e ->
                    logger.error("Rsocket server input connection error",e)
                }
                .doOnCancel {
                    logger.debug("Rsocket server input cancelled")
                }
                .doOnComplete {
                    logger.debug("Rsocket server input completed")
                }
                .subscribe { message ->
                    output.tryEmitNext("pong ${System.currentTimeMillis()}")
                }
        } catch (e: Throwable) {
            logger.error("Rsocket server input connection exception caught",e)
        }
        return output.asFlux()
    }

Flux 客户端测试:

    @Test
    fun testPingFlux() {
        val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()

        rSocketRequester
            .route("testFlux")
            .data(outToServer.asFlux())
            .retrieveFlux<String>()
            .doOnCancel {
                logger.debug("Rsocket client output connection completed")
            }
            .doOnError { e ->
                logger.error("Exception received while processing Rsocket client output flow",e)
            }
            .take(2)
            .subscribe { message ->
                logger.debug("Received pong from server $message")
            }

        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
    }

问题

上面的两个客户端/服务器片段实际上都来回发送 ping/pong 有效负载,但在每种情况下,我都无法在客户端的服务器端处理取消连接。我从客户端获得了自己的 Rsocket client output completed 日志行,然后是来自 Reactor 的 Operator called default onErrorDropped 以及来自 RSocket 的以下堆栈跟踪:

java.util.concurrent.CancellationException: Inbound has been canceled
    at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
    at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

这是一个问题,因为(除了这个玩具示例之外)我的应用程序需要在连接关闭时进行服务器端清理。

我尝试失败的事情

  • 在 Flows 或 Fluxen 上捕获异常、取消或完成的所有各种方法,其中许多方法在上面的示例中进行了说明。
  • 订阅/收集 lambda 表达式中的 try/catch 块。
  • 通过映射运算符将服务器响应 Flux/Flow 直接耦合到输入 Flux/Flow,而不是创建单独的输出 Flux/Flow。
  • 在调试器中逐步执行框架代码,我可以说它很快就迷失了。我从这次冒险中得出的最佳理论是,接收取消信号的 Flux/Flow 以某种方式与我的服务器方法接收的输入 Flux/Flow 分离,但有太多抽象层让我无法追踪它。

在此先感谢您的帮助。

解决方法

Bug filed,将此问题标记为已回答。感谢大家的快速回复。