问题描述
@pytest.fixture(scope="module")
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="module")
async def some_fixture():
return await make_fixture()
@toolz.curry
def throttle(limit,f):
semaphore = asyncio.Semaphore(limit)
@functools.wraps(f)
async def wrapped(*args,**kwargs):
async with semaphore:
return await f(*args,**kwargs)
return wrapped
@throttle(10)
def f():
...
现在从多个测试文件中调用f
,但我收到一个异常消息,告诉我不能使用来自不同事件循环的信号灯。
我尝试使用会话级事件循环夹具:
@pytest.fixture(scope="session",autouse=True)
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
但这只给了我
ScopeMismatch:您尝试使用涉及工厂的“模块”范围的请求对象访问“功能”范围的固定装置“ event_loop”
甚至有可能让xdist +异步夹具+信号灯一起工作吗?
解决方法
最终使用以下conftest.py
使它起作用:
import asyncio
import pytest
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()