问题描述
假设我有一个 celery 任务,它需要两个参数:X(a,b)
我需要使用以下两条规则来实现自定义并发逻辑:
-
如果
X
的值不同,a
的实例可以同时运行。也就是说,如果X(a=1,b=10)
在加入队列时X(a=2,b=20)
正在运行,则后者从队列中拉出并立即执行。 -
如果
X
的值相同,则a
的实例不能同时运行。也就是说,如果当X(a=1,b=10)
加入队列时X(a=1,b=20)
正在运行,那么后者必须在队列中等待,直到前者完成。
规则 #1 通过设置 worker_concurrency>1
(docs) 与 celery 一起开箱即用。第 2 条规则很棘手。
分布式任务锁定,如 docs 和 this blog 中所述,是一种使我接近所需的方法。甚至还有一些库可以为您实现它(celery-singleton)。然而,回顾规则#2,这种方法似乎可以防止第二个任务排队,直到第一个任务完成。我需要它排队,只是在第一个任务完成之前不在工作人员上执行。
有没有办法实现这个? This SO question 提出了类似的问题,但目前没有答案。
解决方法
这似乎是使用 redis 和绑定 celery 任务的好例子。如果您还没有这样做,您还可以将 redis 用作您的 celery 代理,如果您需要,还可以将其用作缓存层。这真的是一把瑞士军刀。 Deploying redis 也很简单。我强烈鼓励任何人更熟悉它。这是一个很好的工具,可以放在一个人的工具箱中。
我会稍微修改一下例子,因为我总是被单字符函数和变量搞糊涂。
# Think of this as X(a,b) from the question
@task
def add(num1,num2):
return num1 + num2
然后我们可以升级 add
使其看起来更像这样:
# "bind" the task so we have access to all the Task base class functionality
# via "self".
# https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.retry
@task(bind=True)
def add(self,num1,num2):
if does_running_task_exist_with(num1):
# requeue. Please visit the docs for "retry" mentioned above.
# There are also max_retries and some other nice things.
# Try again in 10s
self.retry(countdown=10)
return
return num1 + num2
然后我们的 does_running_task_exist_with
辅助函数将使用一个 redis 集。与所有 Set 实现一样,它们保证唯一性并且检查成员是否存在很快。
# Using https://github.com/andymccurdy/redis-py
import redis
def does_running_task_exist_with(some_number):
# Connect to redis.
# Using database number 2. You might be using db 0 for celery brokerage,# and db 1 for celery result storage. Using a separate DB is just nice
# for isolation. Redis has up to 16.
# Connects to localhost by default.
redis_conn = redis.StrictRedis(db=2)
# we try adding this number to the Set of currently processing numbers
# https://redis.io/commands/sadd
# Return value: the number of elements that were added to the set,# not including all the elements already present into the set.
members_added = redis_conn.sadd("manager_task_args",str(some_number))
# Or shortcut it as "return members_added == 0". This here is
# more expressive though
if members_added == 0:
return True
return False
好的。现在跟踪和决策已经到位。缺少的一件重要事情是:一旦 add
任务完成,我们需要从 redis 集中删除 num1
。让我们稍微调整一下功能。
import redis
@task(bind=True)
def add(self,num2):
if does_running_task_exist_with(num1):
self.retry(countdown=10)
return
# Do actual work…
result = num1 + num2
# Cleanup
redis_conn = redis.StrictRedis(db=2)
redis_conn.srem("manager_task_args",str(num1))
return result
但是如果出现问题怎么办?如果添加失败怎么办?然后我们的 num1
永远不会从 Set 中删除,我们的队列开始变得越来越长。我们不想那样。您可以在此处做两件事:创建 a class-based task with an on_failure
method,或将其包装在 try-except-finally 中。我们将采用 try-finally 路线,因为在这种情况下更容易遵循:
import redis
@task(bind=True)
def add(self,num2):
if does_running_task_exist_with(num1):
self.retry(countdown=10)
return
try:
result = num1 + num2
finally:
redis_conn = redis.StrictRedis(db=2)
redis_conn.srem("manager_task_args",str(num1))
return result
应该可以。请注意,如果您有大量任务,您可能还需要查看 redis connection pooling。