“接收”芹菜任务是什么意思?当所有芹菜工人都被封锁时,那些没有“收到”的新任务又会发生什么?

问题描述

我正在开发一个新的监视系统,该系统可以测量Celery队列的吞吐量,并在备份队列时帮助提醒团队。在我的工作过程中,我遇到了一些我不了解的特殊行为(并且在Celery规范中没有充分记录)。

出于测试目的,我设置了一个端点,该端点将用16个长时间运行的任务填充队列,这些任务可用于模拟备份队列。框架是Flask,队列代理是Redis。将Celery配置为每个工人最多可以并行执行4个任务,而我有2个工人在运行。

api / health.py

def health():
    health = Blueprint("health",__name__)

    @health.route("/api/debug/create-long-queue",methods=["GET"])
    def long_queue():
        for i in range(16):
            sleepy_job.delay()

        return make_response({},200)

    return health

jobs.py

@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args,**kwargs):
    time.sleep(30)

这是我要模拟的备份生产队列的操作:

  1. 我致电/api/debug/create-long-queue以模拟队列中的备份。基于以上数学,工人应忙于每分钟睡觉1分钟(共同,他们一次可以同时处理8个任务。每个任务仅睡30秒,总共有16个任务。)
  2. 我(handle_incoming_message。

这是我看到的使用花检查队列的内容

  • 尽管所有工作人员都被前8个sleepy_job任务阻止,但即使可以肯定handle_incoming_message被称为“ {第二次API调用的结果。
  • 完成前8个handle_incoming_message.delay()任务(约30秒)后,我在队列上看到状态为sleepy_job的新handle_incoming_message
  • 在第二个(也是最后一个)8个RECIEVED任务完成之后,我现在看到sleepy_job的状态为handle_incoming_message(随着UI用新数据更新,我可以确认这一点在该任务中收到并处理了。)

问题

因此,很明显,当工人在处理前8个STARTED任务后暂时不受阻碍时,他们正在做某事标记/确认新的sleepy_job任务花可见的一种方式。 但这留下了几个未解决的问题:

  • 当工作人员被阻止时,新的handle_incoming_message任务的状态是什么?
  • 工人不受阻拦后会发生什么变化,从而使花朵现在可以看到新的handle_incoming_message任务了?
  • “已接收”状态实际上是什么意思?
  • (奖金:如何查看工作人员被阻止时排队的任务?)

解决方法

  1. 当所有工作程序都被阻止时,由于预取,某些任务可能处于接收状态(请参阅文档中的内容)。因此,您的任务很可能只是排在队列中,等待Celery员工接收(协调流程-这些不是实际的员工流程)。

  2. Flower是一项基于Celery功能(称为“任务事件”)的简单服务。简单来说,它(花)将自己预订为所有事件的接收者(已接收,成功,开始,失败等),然后将其可视化地呈现给Web客户端。 More about it here。因此,当芹菜工人接收到任务时,将发送“任务已接收”事件。 Flower获取此事件,并在仪表板上更改该任务的状态。

  3. 当“接收到”任务时,这意味着特定的Celery工作人员将该任务从队列中移出,可以立即执行(如果有免费的工作进程来执行该任务),否则Celery工作人员将等待使工作进程准备好运行任务。我已经提到过预取-Celery工作者通常会执行比可用工作者进程更多的任务。

  4. Celery没有为用户提供列出特定队列中内容的方法。因此,您会看到许多类似的问题-包括this one which offers answers。您将在其中看到我的简短回答。简而言之,这取决于您选择的经纪人。如果是Redis,则只需浏览对象列表。如果是RabbitMQ,则可以使用其工具检查队列。我认为不提供此决定是一个好决定,因为此信息永远都不可靠。在您列出特定队列中的所有任务时,可能有成千上万的新任务...