解决通道/异步测试用例上的 django 测试运行时错误

问题描述

我正在尝试为我的 django (3.2) 渠道消费者编写一个测试用例来实现 websocket 通信。我已经设法让测试用例通过了,但现在似乎在尝试清理测试数据库时测试用例拆卸失败了。

System check identified no issues (0 silenced).
.
----------------------------------------------------------------------
Ran 1 test in 0.130s

OK
Destroying test database for alias 'default'...

...

psycopg2.errors.ObjectInUse: database "test_myproject" is being accessed by other users
DETAIL:  There is 1 other session using the database.

...
  File ".../python3.8/site-packages/django/core/management/commands/test.py",line 55,in handle
    failures = test_runner.run_tests(test_labels)
  File ".../python3.8/site-packages/django/test/runner.py",line 736,in run_tests
    self.teardown_databases(old_config)
  File ".../python3.8/site-packages/django/test/runner.py",line 674,in teardown_databases
    _teardown_databases(
  File ".../python3.8/site-packages/django/test/utils.py",line 313,in teardown_databases
    connection.creation.destroy_test_db(old_name,verbosity,keepdb)
  File ".../python3.8/site-packages/django/db/backends/base/creation.py",line 282,in destroy_test_db
    self._destroy_test_db(test_database_name,verbosity)
  File ".../python3.8/site-packages/django/db/backends/base/creation.py",line 298,in _destroy_test_db
    cursor.execute("DROP DATABASE %s"
  File ".../python3.8/contextlib.py",line 162,in __exit__
    raise RuntimeError("generator didn't stop after throw()")
RuntimeError: generator didn't stop after throw()


消费者是使用 async 编写的,似乎我在 async 如何与测试用例集成方面遇到了麻烦。我还需要做什么才能使测试用例正常运行。

我创建并配置了一个夹具来将数据加载到数据库,并怀疑这与那里的测试用例数据库交互有关...

我尝试使用标准 django“TestCase”代替“TransactionTestCase”,但显然配置的夹具没有按预期加载到数据库中。

class ConsumerTestCase(TransactionTestCase):
    fixtures = ("basic-data",)

    async def test_alertconsumer__receive__valid(self):
        # defined in fixture
        username = "test-username"
        alert_id = 1

        url = "/alerts/ws/"
        communicator = WebsocketCommunicator(AlertConsumer.as_asgi(),url)
        connected,subprotocol = await communicator.connect()
        assert connected

        message = {"username": "me","alert_id": 1 }
        await communicator.send_to(text_data=json.dumps(message))
        response = await communicator.receive_from()
        expected = {
            "message": "ACCEPTED"
        }
        self.assertDictEqual(actual,expected)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)