问题描述
我从这里 (https://github.com/Playtika/feign-reactive) 使用 Reactive Feign 实现,但即使我尝试使用 Spring WebClient 我也遇到同样的问题。
当我使用 POST 请求调用 URL 并传递 JSON 正文时,流从未完成。线程保持冻结等待完成并且永远不会发生。
fun post(request: PostRequest): Mono<PostResponse> {
val response = myClient.post(request)
.onErrorResume {
log.error("Error sending request",it)
Mono.error<PostResponse>(it)
}
.doOnSuccess {
log.info("Success with response $it")
it
}
response.block()
return response
}
但是,当我调用 subscribe()
时什么也没有发生并且请求保持冻结状态。当我调用 block()
方法时,它抛出以下异常:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking,which is not supported in thread reactor-http-nio-4
运行调试,我得到了这个日志:
2021-01-06 11:57:42.047 DEBUG 13624 --- [ctor-http-nio-4] r.client.log.DefaultReactiveLogger : [MyClient#post]--->POST http://localhost:8081/post HTTP/1.1
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Channel acquired,Now 1 active connections and 0 inactive connections
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] reactor.netty.ReactorNetty : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Added decoder [ReadTimeoutHandler] at the end of the user pipeline,full pipeline: [reactor.left.httpCodec,ReadTimeoutHandler,reactor.right.reactiveBridge,DefaultChannelPipeline$TailContext#0]
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] reactor.netty.ReactorNetty : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Added decoder [WriteTimeoutHandler] at the end of the user pipeline,WriteTimeoutHandler,DefaultChannelPipeline$TailContext#0]
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] r.netty.http.client.HttpClientConnect : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Handler is being applied: {uri=http://localhost:8081/post,method=POST}
2021-01-06 11:57:42.049 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider : [id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] onStateChange(POST{uri=/post,connection=PooledConnection{channel=[id: 0x77f83c34,L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80]}},[request_prepared])
2021-01-06 11:57:42.074 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider : [id: 0x77f83c34,[request_sent])
有没有办法提出这个请求?
解决方法
以这种方式用 .map() 替换 .block() 调用:
fun post(request: PostRequest): Mono<PostResponse> =
myClient.post(request)
.onErrorResume {
log.error("Error sending request",it)
Mono.error<PostResponse>(it)
}
.map {
log.info("Success with response $it")
ok(it)
}
.map() 是同步的,所以它的作用与 subscribe() 相同。应该从客户端调用订阅,而不是从非阻塞管道上的服务器端调用。使用 .map() 管道变得同步,但它不会运行直到客户端 .subscribes()