问题描述
我对异步客户端测试有疑问。这是我关于测试的代码
class TestSocketClient:
@classmethod
def setup_class(cls):
# enable parallel testing by adding the pytest worker id number to the default port
worker_id = os.environ.get("PYTEST_XdisT_WORKER","gw0")
worker_number = int(worker_id[2:])
cls.mock_server = ServerMock()
cls.mock_server.port = ServerMock.port + worker_number
cls.username = os.environ.get("USERNAME","")
cls.password = os.environ.get("PASSWORD","")
cls.socket_client = SocketClient(username=cls.username,password=cls.password)
cls.socket_client.hostname = "0.0.0.0"
cls.socket_client.port = SocketClient.port + worker_number
@pytest.mark.asyncio
async def test_login(self):
await self.mock_server.start()
response = await self.socket_client.login()
assert response == actual_response
await self.socket_client.close()
@pytest.mark.asyncio
async def test_send_heartbeat(self):
await self.mock_server.start()
await self.socket_client.login()
await self.socket_client.send_heartbeat()
await self.socket_client.close()
我可以分别在TestSocketClient
下运行测试,它们将分别通过。但是,当我与pytest -n auto
一起运行测试套件时,后一个测试用例将引发error while attempting to bind on address ('0.0.0.0',2056): address already in use
。我的问题是如何在不解决分配问题的情况下使测试套件通过,以便它们可以在CI流程中成功运行。如果在编写异步测试中有一些更有价值的建议,我将不胜感激(例如,如果我只想测试客户端希望发送到服务器的请求,我应该断言什么?服务器端,或者仅在客户端写assert_called_once
之类的内容。预先感谢!
更新:
class TestSocketClient:
ports_taken = set()
@classmethod
def setup_class(cls):
cls.mock_server = ServerMock()
cls.username = os.environ.get("USERNAME",password=cls.password)
cls.socket_client.hostname = "0.0.0.0"
cls.socket_client.port = cls.mock_server.port
def bump(self):
if len(self.ports_taken) == 0:
self.ports_taken.add(self.mock_server.port)
new_port = max(self.ports_taken) + 1
self.mock_server.port = new_port
self.socket_client.port = new_port
self.ports_taken.add(self.mock_server.port)
async def start(self):
self.bump()
try:
await self.mock_server.start()
except:
self.bump()
await self.mock_server.start()
@pytest.mark.asyncio
async def test_login(self):
await self.start()
...
希望这会有所帮助!
解决方法
对于public function del(Request $request)
{
Guanyin::find($request->id)->delete();
Light::where('sx_ID','=',$request->sx_ID)
->update(['gylight' => 0]);
return response()->json();
}
,通过以下方式检查当前正在运行的进程
address already in use
在此,您将注意到该进程已经存在。请使用ps -ax | grep <your app/app port>
,现在重新启动服务。我不确定异步测试。