对 python asyncio gRPC 客户端的多线程支持

问题描述

我有一个在多线程环境中使用的 asyncio gRPC 客户端。当多个线程同时通过客户端连接到服务时,我看到以下错误流:

2021-01-27 09:33:56,937 ERROR [asyncio] [thread_0] Exception in callback PollerCompletionQueue._handle_events()()
handle: )()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/events.py",line 81,in _run
    self._context.run(self._callback,*self._args)
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi",line 147,in grpc._cython.cygrpc.PollerCompletionQueue._handle_events
BlockingIOError: [Errno 11] Resource temporarily unavailable
2021-01-27 09:33:56,937 ERROR [asyncio] [thread_1] Exception in callback PollerCompletionQueue._handle_events()()
handle: )()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/events.py",in grpc._cython.cygrpc.PollerCompletionQueue._handle_events
BlockingIOError: [Errno 11] Resource temporarily unavailable

请求看似成功完成,但消息充斥着我的日志,让我很紧张!

在我的测试中,每个线程创建自己的通道并提交自己的异步请求。无论服务负载如何,都会发生错误。如果客户端在不同的进程中运行,则不会发生这些错误

我的设置:

  • Python 版本:3.8.6
  • grpcio 版本:1.35.0

感谢任何见解!

解决方法

gRPC AsyncIO 使用 UDS 在 C 扩展和 Python 之间进行通信。从您的日志中可以看出,fd 访问存在竞争条件。 AsyncIO API 支持多线程,但这看起来像是一个新问题(有助于在 https://github.com/grpc/grpc/issues 上创建问题)。

竞争条件的修复可能很棘手,因为 AsyncIO 使用非线程安全的 AsyncIO 锁。如果我们使用线程安全锁保护 fd,它可能会阻塞 AsyncIO 循环。随时提出或贡献解决方案。

如果让所有客户端都在一个线程上运行,AsyncIO 的性能最佳。事件循环将在没有线程跳跃的情况下很好地处理协程的执行。如果目标是饱和机器上的所有计算能力,正如您提到的,最好使用多处理。

链接到基本的 gRPC AsyncIO 示例:https://github.com/grpc/grpc/blob/master/examples/python/helloworld/async_greeter_client.py