使用 pytest 同时运行 celery 任务

问题描述

我正在尝试在我的 Django 应用程序中集成测试并发 celery 任务。我希望该任务在 pytest 集成测试期间实际在一个工作器上同时运行,但我无法完成这项工作。

假设我有以下基本的芹菜任务:

@shared_task
def sleep_task(secs):
    print(f"Sleeping for {secs} seconds...")
    for i in range(secs):
        time.sleep(i)
        print(f"\t{i + 1}")
    return "DONE"

在脚本中同时运行任务工作正常:

# script
sleep_task.delay(3)
sleep_task.delay(3)

# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3

但是,我无法在单元测试中复制这种异步行为。在 conftest.py 中,我设置了代理和结果后端:

import pytest

pytest_plugins = ("celery.contrib.pytest",)

@pytest.fixture(scope="session")
def celery_config():
    return {"broker_url": "memory://","result_backend": "rpc://"}

这是我的单元测试。 celery_session_appcelery_session_worker 设置用于测试的 celery 应用程序和工作线程 (docs):

def test_concurrent_sleep_tasks(celery_session_app,celery_session_worker):
    sleep_task.delay(3)
    sleep_task.delay(3)

# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3

看来任务是同步运行的。有没有办法异步运行任务?

解决方法

celery_session_worker 装置正在启动一个 celery worker,并发为 1。这是在 celery.contrib 中硬编码的,目前不可配置。由于并发为 1,因此一次只能处理 1 个任务。

另一种解决方案是编写自己的夹具来启动并发数为 2 或更多的 worker。