python异步客户端测试中已经使用的地址

问题描述

我对异步客户端测试有疑问。这是我关于测试的代码

class TestSocketClient:
    @classmethod
    def setup_class(cls):
        # enable parallel testing by adding the pytest worker id number to the default port
        worker_id = os.environ.get("PYTEST_XdisT_WORKER","gw0")
        worker_number = int(worker_id[2:])

        cls.mock_server = ServerMock()
        cls.mock_server.port = ServerMock.port + worker_number

        cls.username = os.environ.get("USERNAME","")
        cls.password = os.environ.get("PASSWORD","")
        cls.socket_client = SocketClient(username=cls.username,password=cls.password)
        cls.socket_client.hostname = "0.0.0.0"
        cls.socket_client.port = SocketClient.port + worker_number

    @pytest.mark.asyncio
    async def test_login(self):
        await self.mock_server.start()
        response = await self.socket_client.login()
        assert response == actual_response
        await self.socket_client.close()

    @pytest.mark.asyncio
    async def test_send_heartbeat(self):
        await self.mock_server.start()
        await self.socket_client.login()
        await self.socket_client.send_heartbeat()
        await self.socket_client.close()

我可以分别在TestSocketClient下运行测试,它们将分别通过。但是,当我与pytest -n auto一起运行测试套件时,后一个测试用例将引发error while attempting to bind on address ('0.0.0.0',2056): address already in use。我的问题是如何在不解决分配问题的情况下使测试套件通过,以便它们可以在CI流程中成功运行。如果在编写异步测试中有一些更有价值的建议,我将不胜感激(例如,如果我只想测试客户端希望发送到服务器的请求,我应该断言什么?服务器端,或者仅在客户端写assert_called_once之类的内容。预先感谢!

更新:

我终于在如下所示的不同测试中解决了端口增加的问题

class TestSocketClient:
    ports_taken = set()

    @classmethod
    def setup_class(cls):

        cls.mock_server = ServerMock()

        cls.username = os.environ.get("USERNAME",password=cls.password)
        cls.socket_client.hostname = "0.0.0.0"
        cls.socket_client.port = cls.mock_server.port
     
     def bump(self):
        if len(self.ports_taken) == 0:
           self.ports_taken.add(self.mock_server.port)
        new_port = max(self.ports_taken) + 1
        self.mock_server.port = new_port
        self.socket_client.port = new_port
        self.ports_taken.add(self.mock_server.port)
     
     async def start(self):
        self.bump()
        try:
           await self.mock_server.start()
        except:
           self.bump()
           await self.mock_server.start()
     
     @pytest.mark.asyncio
     async def test_login(self):
         await self.start()
         ...

希望这会有所帮助!

解决方法

对于public function del(Request $request) { Guanyin::find($request->id)->delete(); Light::where('sx_ID','=',$request->sx_ID) ->update(['gylight' => 0]); return response()->json(); } ,通过以下方式检查当前正在运行的进程

address already in use

在此,您将注意到该进程已经存在。请使用ps -ax | grep <your app/app port>

终止该进程

,现在重新启动服务。我不确定异步测试。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...