Celery-如何通过任务ID获取任务名称?

问题描述

Celery-底线:我想通过使用任务ID(我没有任务对象)来获取任务名称

假设我有以下代码

res = chain(add.s(4,5),add.s(10)).delay()
cache.save_task_id(res.task_id)

然后在其他地方:

task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')

我知道如果我有一个任务对象,就可以得到任务名称,例如:celery: get function name by task id?。 但是我没有任务对象(也许可以通过task_id或其他方式创建它?我在文档中没有看到与此相关的任何东西)。

此外,我不想将任务名称保存在缓存中。 (假设我有一个很长的链条/其他芹菜原语,我不想保存它们的所有名称/ task_id。仅使用最后一个task_id就足以使用.parents等获取有关所有任务的所有信息)

我研究了AsyncResult和AsyncResult.Backend对象的所有相关方法。似乎唯一相关的是backend.get_task_Meta(task_id),但其中不包含任务名称。 预先感谢

PS:AsyncResult.name始终返回None:

result = AsyncResult(task_id,app=celery_app)
result.name #Returns None
result.args #Also returns None

解决方法

终于找到了答案。 对于任何想知道的人: 您可以通过在celery配置中启用result_extended = True来解决此问题。 然后:

result = AsyncResult(task_id,app=celery_app)
result.task_name #tasks.add
,

类似下面的(伪代码)应该足够了:

app = Celery("myapp")  # add your parameters here
task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
aresult = app.AsyncResult(task_id)
task_name = aresult.name
task_args = aresult.args
print(task_name,task_args)

不幸的是,它不起作用(我会说这是Celery中的错误),因此我们必须找到替代方法。我想到的第一件事是Celery CLI具有inspect query_task功能,这提示我可以通过使用inspect API查找任务名称,我是对的。这是代码:

# Since the expected way does not work we need to use the inspect API:
insp = app.control.inspect()
task_ids = [task_id]
inspect_result = insp.query_task(*task_ids)
# print(inspect_result)
for node_name in inspect_result:
    val = inspect_result[node_name]
    if val:
        # we found node that executes the task
        arr = val[task_id]
        state = arr[0]
        meta = arr[1]
        task_name = meta["name"]
        task_args = meta["args"]
        print(task_name,task_args)

此方法的问题在于它仅在任务运行时才起作用。完成后,您将无法使用上面的代码。

,

celery.result.AsyncResult的文档来看,这不是很清楚,但是除非您按照configuration docs启用result_extended = True,否则并非所有属性都已填充:

result_extended

默认:False

启用扩展任务结果属性(名称,args,kwargs,worker,重试,队列,delivery_info)以写入后端。

然后以下将起作用:

result = AsyncResult(task_id)
result.name = 'project.tasks.my_task'
result.args = [2,3]
result.kwargs = {'a': 'b'}

还请注意,rpc://后端不存储此数据,您将需要Redis或类似的东西。如果您使用的是rpc,则即使使用result_extended = True,也仍然会返回None

,

我在这个code snippet中找到了一个很好的答案。

如果您有 AsyncResult 的实例,则不需要 task_id,您可以简单地执行以下操作:


result # instance of AsyncResult
result_meta = result._get_task_meta()
task_name = result_meta.get("task_name")

当然这依赖于一个私有方法,所以它有点hacky。我希望 celery 引入一种更简单的方法来检索它——它对测试特别有用。