防止芹菜任务开始,直到完成具有相似参数的不同任务

问题描述

假设我有一个 celery 任务,它需要两个参数:X(a,b)

我需要使用以下两条规则来实现自定义并发逻辑:

  1. 如果 X 的值不同,a 的实例可以同时运行。也就是说,如果X(a=1,b=10)在加入队列时X(a=2,b=20)正在运行,则后者从队列中拉出并立即执行。

  2. 如果 X 的值相同,则 a 的实例不能同时运行。也就是说,如果当 X(a=1,b=10) 加入队列时 X(a=1,b=20) 正在运行,那么后者必须在队列中等待,直到前者完成。

规则 #1 通过设置 worker_concurrency>1 (docs) 与 celery 一起开箱即用。第 2 条规则很棘手。

分布式任务锁定,如 docsthis blog 中所述,是一种使我接近所需的方法。甚至还有一些库可以为您实现它(celery-singleton)。然而,回顾规则#2,这种方法似乎可以防止第二个任务排队,直到第一个任务完成。我需要它排队,只是在第一个任务完成之前不在工作人员上执行。

有没有办法实现这个? This SO question 提出了类似的问题,但目前没有答案。

解决方法

这似乎是使用 redis 和绑定 celery 任务的好例子。如果您还没有这样做,您还可以将 redis 用作您的 celery 代理,如果您需要,还可以将其用作缓存层。这真的是一把瑞士军刀。 Deploying redis 也很简单。我强烈鼓励任何人更熟悉它。这是一个很好的工具,可以放在一个人的工具箱中。

我会稍微修改一下例子,因为我总是被单字符函数和变量搞糊涂。

# Think of this as X(a,b) from the question
@task
def add(num1,num2):
    return num1 + num2

然后我们可以升级 add 使其看起来更像这样:

# "bind" the task so we have access to all the Task base class functionality
# via "self".
# https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.retry
@task(bind=True)
def add(self,num1,num2):
    if does_running_task_exist_with(num1):
        # requeue. Please visit the docs for "retry" mentioned above.
        # There are also max_retries and some other nice things.
        # Try again in 10s
        self.retry(countdown=10)
        return
    return num1 + num2

然后我们的 does_running_task_exist_with 辅助函数将使用一个 redis 集。与所有 Set 实现一样,它们保证唯一性并且检查成员是否存在很快。

# Using https://github.com/andymccurdy/redis-py
import redis

def does_running_task_exist_with(some_number):
    # Connect to redis.
    # Using database number 2. You might be using db 0 for celery brokerage,# and db 1 for celery result storage. Using a separate DB is just nice
    # for isolation. Redis has up to 16.
    # Connects to localhost by default.
    redis_conn = redis.StrictRedis(db=2)
    # we try adding this number to the Set of currently processing numbers
    # https://redis.io/commands/sadd
    # Return value: the number of elements that were added to the set,# not including all the elements already present into the set.
    members_added = redis_conn.sadd("manager_task_args",str(some_number))
    # Or shortcut it as "return members_added == 0". This here is 
    # more expressive though
    if members_added == 0:
        return True
    return False

好的。现在跟踪和决策已经到位。缺少的一件重要事情是:一旦 add 任务完成,我们需要从 redis 集中删除 num1。让我们稍微调整一下功能。

import redis

@task(bind=True)
def add(self,num2):
    if does_running_task_exist_with(num1):
        self.retry(countdown=10)
        return
    # Do actual work…
    result = num1 + num2
    # Cleanup
    redis_conn = redis.StrictRedis(db=2)
    redis_conn.srem("manager_task_args",str(num1))
    return result

但是如果出现问题怎么办?如果添加失败怎么办?然后我们的 num1 永远不会从 Set 中删除,我们的队列开始变得越来越长。我们不想那样。您可以在此处做两件事:创建 a class-based task with an on_failure method,或将其包装在 try-except-finally 中。我们将采用 try-finally 路线,因为在这种情况下更容易遵循:

import redis

@task(bind=True)
def add(self,num2):
    if does_running_task_exist_with(num1):
        self.retry(countdown=10)
        return
    try:
        result = num1 + num2
    finally:
        redis_conn = redis.StrictRedis(db=2)
        redis_conn.srem("manager_task_args",str(num1))
    return result

应该可以。请注意,如果您有大量任务,您可能还需要查看 redis connection pooling