为Python中的每个Celery任务分配特定数量的工作程序

问题描述

我有三个芹菜任务:

  • 预测
  • 培训
  • 健康检查

我的机器上有4个cpu。我想分配3名工人进行预测和培训,并分配1名工人进行健康检查。最简单的实现方法是什么?请注意,我将安排这些任务,因此,我无法直接在apply_async()函数中指定工作者的数量

实际配置:

CELERY = Celery(
    CELERY_APP_NAME,backend=CELERY_BACKEND,broker=CELERY_broKER,include=["src.tasks"],)

CELERY.conf.update({"task_routes": {"src.tasks.*": {"queue": "input_queue"}},}
                   )


@CELERY.task
def prediction():
    pass

@CELERY.task
def training():
    pass

@CELERY.task
def healthcheck():
    pass

以及运行工作程序的命令:

celery --loglevel=INFO -A src.tasks worker -Q input_queue

解决方法

这就是我要这样做的方式,因为我永远无法真正理解celery multi

  • celery -A yourproject.yourapp -l info -c 3 -Q prediction
  • celery -A yourproject.yourapp -l info -c 1 -Q healthcheck

运行与以上类似的操作(使用正确的应用程序参数)后,最终将有两个工作线程订阅不同的队列。您的预测和训练任务将由queue=prediction以及其他命名参数触发,而运行状况检查任务应以类似方式发送到healthcheck队列。