Django Channels Redis:应用程序内部异常:未获取锁

问题描述

使用 daphne/Channels 使用 1000 个 WebSockets 完全加载的多租户 Django 应用程序,运行良好几个月,突然租户都称其为支持线,应用程序运行缓慢或完全挂起。由于 HTTP REST API 命中率快速且无错误,因此将其缩小为 WebSockets。

应用程序日志或操作系统日志均未表明存在问题,因此唯一要做的就是下面提到的异常。在这两天里,这种情况一遍又一遍地发生。

我不希望得到任何深入的调试帮助,只是一些关于可能性的即兴建议。

AWS Linux 1
Python 3.6.4
Elasticache Redis 5.0
channels==2.4.0
channels-redis==2.4.2
daphne==2.5.0
Django==2.2.13

拆分配置HTTP由uwsgi服务,daphne服务asgiNginx

May 10 08:08:16 prod-b-web1: [pid 15053] [version 119.5.10.5086] [tenant_id -] [domain_name -] [pathname /opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/daphne/server.py] [lineno 288] [priority ERROR] [funcname application_checker] [request_path -] [request_method -] [request_data -] [request_user -] [request_stack -] Exception inside application: Lock is not acquired.
Traceback (most recent call last):
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py",line 435,in receive
    real_channel
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py",line 484,in receive_single
    await self.receive_clean_locks.acquire(channel_key)
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py",line 152,in acquire
    return await self.locks[channel].acquire()
  File "/opt/python3.6/lib/python3.6/asyncio/locks.py",line 176,in acquire
    yield from fut
concurrent.futures._base.CancelledError

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/sessions.py",line 183,in __call__
    return await self.inner(receive,self.send)
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/middleware.py",line 41,in coroutine_call
    await inner_instance(receive,send)
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/consumer.py",line 59,in __call__
    [receive,self.channel_receive],self.dispatch
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels/utils.py",line 58,in await_many_dispatch
    await task
  File "/opt/releases/r119.5.10.5086/env/lib/python3.6/site-packages/channels_redis/core.py",line 447,in receive
    self.receive_lock.release()
  File "/opt/python3.6/lib/python3.6/asyncio/locks.py",line 201,in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

解决方法

首先,让我们看看 RuntimeError: Lock is not acquired. 错误的来源。如回溯所示,文件 release() 中的 /opt/python3.6/lib/python3.6/asyncio/locks.py 方法定义如下:

    def release(self):
        """Release a lock.

        When the lock is locked,reset it to unlocked,and return.
        If any other coroutines are blocked waiting for the lock to become
        unlocked,allow exactly one of them to proceed.

        When invoked on an unlocked lock,a RuntimeError is raised.

        There is no return value.
        """
        if self._locked:
            self._locked = False
            self._wake_up_first()
        else:
            raise RuntimeError('Lock is not acquired.')

原语锁是一种同步原语,锁定时不属于特定线程。

尝试通过调用 release() 方法释放未锁定的锁时,将引发 RuntimeError,因为该方法只能在锁定状态下调用。在锁定状态调用时,状态变为解锁。

现在,对于同一文件中 acquire() 方法中引发的先前错误,acquire() 方法定义如下:

    async def acquire(self):
        """Acquire a lock.

        This method blocks until the lock is unlocked,then sets it to
        locked and returns True.
        """
        if (not self._locked and (self._waiters is None or
                all(w.cancelled() for w in self._waiters))):
            self._locked = True
            return True

        if self._waiters is None:
            self._waiters = collections.deque()
        fut = self._loop.create_future()
        self._waiters.append(fut)

        # Finally block should be called before the CancelledError
        # handling as we don't want CancelledError to call
        # _wake_up_first() and attempt to wake up itself.
        try:
            try:
                await fut
            finally:
                self._waiters.remove(fut)
        except exceptions.CancelledError:
            if not self._locked:
                self._wake_up_first()
            raise

        self._locked = True
        return True

因此,为了引发您的 concurrent.futures._base.CancelledError 错误,必须是 await fut 导致了该问题。

要修复它,您可以查看Awaiting an asyncio.Future raises concurrent.futures._base.CancelledError instead of waiting for a value/exception to be set

基本上,你的代码中可能有一个你没有等待的等待,并且通过不等待,你永远不会将控制权交还给事件循环或存储等待,导致它被立即清理,完全取消它(以及它控制的所有等待对象)

只需确保您等待代码中等待的结果,找到您错过的任何结果。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...