Celery - 检查工人是否收到 SIGTERM

问题描述

我有一个很长的 Celery 任务。超过几分钟。

有时,由于各种原因,一个工人被标记为终止,而另一个工人开始。如果需要更换运行它的机器,或者正在部署新的代码版本,就会发生这种情况。在这种情况下,worker 会收到 SIGTERM 信号。

我想知道任务本身是否可以定期检查此工作人员是否已收到 SIGTERM 并且正在等待终止,在这种情况下,只需将任务放回队列中并终止即可。 (该任务将在另一个 worker 上启动,并将继续执行其工作)

编辑:澄清 - 是否可以在任务中检查它是否在等待终止的工作线程上执行。像这样:

# Some long task that can take even a few hours.
def some_task(...):
    for i in range(...):
        do_some_work()
        # That's the missing function:
        if did_this_worker_received_SIGTERM_and_waiting_to_be_terminated():
             # stop the task in the middle,and it will be executed again later

解决方法

当 Celery worker 收到 SIGTERM 时,它会启动热关机。这意味着它将自己从所有队列中取消订阅,预取的任务(如果有)将返回到它们的队列中,而 worker 本身将在关闭之前开始等待当前正在运行的任务完成。如果您害怕的话,不会丢失任何任务。

所有这些事件都可以处理(见Worker Signals)。

如果您仍然坚持在处理工作状态的任务中使用一些额外的逻辑,那么最简单的解决方案可能是实现工作关闭处理程序(如我上面提到的文档部分所述),使其在 Redis 中存储一个标志或其他一些分布式 K/V 存储),并重构需要它的任务,以便它们访问此标志并执行您需要它们执行的任何操作。

,

请问你为什么要做这样的事情?您是否启用了 task_acks_late?这样,如果任务没有按时完成并且工作器停机,任务将在新工作器上重新运行。

这是documentation。还有 task_reject_on_worker_lost 我没试过,但也许它也能帮到你:

将此设置为 true 允许消息重新排队,因此 任务将由同一个工作人员或另一个工作人员再次执行 工人。