使用 Webflux 进行 Reactive Feign

问题描述

我从这里 (https://github.com/Playtika/feign-reactive) 使用 Reactive Feign 实现,但即使我尝试使用 Spring WebClient 我也遇到同样的问题。

当我使用 POST 请求调用 URL 并传递 JSON 正文时,流从未完成。线程保持冻结等待完成并且永远不会发生。

fun post(request: PostRequest): Mono<PostResponse> {   
    val response = myClient.post(request)
        .onErrorResume {
            log.error("Error sending request",it)
            Mono.error<PostResponse>(it)
        }
        .doOnSuccess {
            log.info("Success with response $it")
            it
        }
    response.block() 
    return response
}

但是,当我调用 subscribe() 时什么也没有发生并且请求保持冻结状态。当我调用 block() 方法时,它抛出以下异常:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking,which is not supported in thread reactor-http-nio-4

运行调试,我得到了这个日志:

2021-01-06 11:57:42.047 DEBUG 13624 --- [ctor-http-nio-4] r.client.log.DefaultReactiveLogger       : [MyClient#post]--->POST http://localhost:8081/post HTTP/1.1
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider   : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Channel acquired,Now 1 active connections and 0 inactive connections
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Added decoder [ReadTimeoutHandler] at the end of the user pipeline,full pipeline: [reactor.left.httpCodec,ReadTimeoutHandler,reactor.right.reactiveBridge,DefaultChannelPipeline$TailContext#0]
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Added decoder [WriteTimeoutHandler] at the end of the user pipeline,WriteTimeoutHandler,DefaultChannelPipeline$TailContext#0]
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] r.netty.http.client.HttpClientConnect    : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Handler is being applied: {uri=http://localhost:8081/post,method=POST}
2021-01-06 11:57:42.049 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider   : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] onStateChange(POST{uri=/post,connection=PooledConnection{channel=[id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80]}},[request_prepared])
2021-01-06 11:57:42.074 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider   : [id: 0x77f83c34,[request_sent])

有没有办法提出这个请求?

解决方法

以这种方式用 .map() 替换 .block() 调用:

fun post(request: PostRequest): Mono<PostResponse> =
    myClient.post(request)
        .onErrorResume {
            log.error("Error sending request",it)
            Mono.error<PostResponse>(it)
        }
        .map {
          log.info("Success with response $it")
          ok(it)
        }

.map() 是同步的,所以它的作用与 subscribe() 相同。应该从客户端调用订阅,而不是从非阻塞管道上的服务器端调用。使用 .map() 管道变得同步,但它不会运行直到客户端 .subscribes()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...