问题描述
我正在尝试在我的 Django 应用程序中集成测试并发 celery 任务。我希望该任务在 pytest 集成测试期间实际在一个工作器上同时运行,但我无法完成这项工作。
假设我有以下基本的芹菜任务:
@shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"\t{i + 1}")
return "DONE"
在脚本中同时运行任务工作正常:
# script
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
但是,我无法在单元测试中复制这种异步行为。在 conftest.py
中,我设置了代理和结果后端:
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "memory://","result_backend": "rpc://"}
这是我的单元测试。 celery_session_app
和 celery_session_worker
设置用于测试的 celery 应用程序和工作线程 (docs):
def test_concurrent_sleep_tasks(celery_session_app,celery_session_worker):
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3
看来任务是同步运行的。有没有办法异步运行任务?
解决方法
celery_session_worker 装置正在启动一个 celery worker,并发为 1。这是在 celery.contrib 中硬编码的,目前不可配置。由于并发为 1,因此一次只能处理 1 个任务。
另一种解决方案是编写自己的夹具来启动并发数为 2 或更多的 worker。