当我使用 SimpleForwardingServerCallListener 时,gRPC 上下文返回 null

问题描述

我想覆盖 sendMessage 中的 sendHeadersonMessageonHalfCloseServerInterceptor 方法(使用 Context):

val context = Context.current().withValue(TestConstants.CONTEXT_KEY,"testValue1")
val delegatedCall = object : SimpleForwardingServerCall<ReqT,RespT>(call) {

    override fun sendMessage(message: RespT) {
        this@SimpleServerInterceptor.sendMessage(message)
        super.sendMessage(message)
    }

    override fun sendHeaders(headers: Metadata) {
        this@SimpleServerInterceptor.sendHeaders(headers)
        super.sendHeaders(headers)
    }

    override fun close(status: Status,trailers: Metadata) {
        this@SimpleServerInterceptor.close(status,trailers)
        super.close(status,trailers)
    }
}
val delegatedListener: ServerCall.Listener<ReqT> =
    if (context === null)
        next.startCall(delegatedCall,headers)
    else
        Contexts.interceptCall(context,delegatedCall,headers,next)

return object : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegatedListener) {

    override fun onMessage(message: ReqT) {
        this@SimpleServerInterceptor.onMessage(message,headers)
        super.onMessage(message)
    }

    override fun onHalfClose() {
        this@SimpleServerInterceptor.onHalfClose(headers)
        super.onHalfClose()
    }

    override fun onCancel() {
        this@SimpleServerInterceptor.onCancel(headers)
        super.onCancel()
    }

    override fun onComplete() {
        this@SimpleServerInterceptor.onComplete(headers)
        super.onComplete()
    }

    override fun onReady() {
        this@SimpleServerInterceptor.onReady(headers)
        super.onReady()
    }
}

输出如下:

>>>>intercept1
>>>>intercept2: testValue1
>>>>onReady1: null
>>>>onMessage1: null
>>>>onHalfClose1: null
HelloService3.hello: testValue1
>>>>sendHeaders2: testValue1
>>>>sendHeaders1: testValue1
>>>>sendMessage2: testValue1
>>>>sendMessage1: testValue1

您可以看到 ContextdelegatedCall(sendMessage,sendHeaders) 有效但对 delegatedListener(onMessage,{{ 1}}).

为什么以及如何解决这个问题?

解决方法

我认为问题在于那个特定的 SimpleForwardingServerCallListener 没有看到更改,因为它是在 Contexts.interceptCall() 内的侦听器之前执行的。 Listener 用于回调,因此它们由 gRPC 调用。 gRPC 将调用返回的侦听器,该侦听器最终将调用 delegatedListener,而 delegatedListener 是进行上下文调整的侦听器。 Contexts.interceptCall() 根本不会改变 call,因此调用看到上下文这一事实意味着侦听器必须正常工作。

我建议创建一个 ServerCallHandler 来执行对 SimpleServerInterceptor 的所有调用,并将该处理程序传递给 Contexts.interceptCall()

(只是草图,因为我不熟悉 Kotlin)

val context = Context.current().withValue(TestConstants.CONTEXT_KEY,"testValue1")
if (context === null) // Unclear how this will be null
    return next.startCall(delegatedCall,headers)
else
    return Contexts.interceptCall(context,delegatedCall,headers,new SimpleServerInterceptorHandler(next))

...

// Within SimpleServerInterceptorHandler
val delegatedCall = object : SimpleForwardingServerCall<ReqT,RespT>(call) {
  ...
}
val delegatedListener: ServerCall.Listener<ReqT> =
        next.startCall(delegatedCall,headers)
return object : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegatedListener) {
  ...
}