问题描述
Celery-底线:我想通过使用任务ID(我没有任务对象)来获取任务名称
res = chain(add.s(4,5),add.s(10)).delay()
cache.save_task_id(res.task_id)
然后在其他地方:
task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
我知道如果我有一个任务对象,就可以得到任务名称,例如:celery: get function name by task id?。 但是我没有任务对象(也许可以通过task_id或其他方式创建它?我在文档中没有看到与此相关的任何东西)。
此外,我不想将任务名称保存在缓存中。 (假设我有一个很长的链条/其他芹菜原语,我不想保存它们的所有名称/ task_id。仅使用最后一个task_id就足以使用.parents等获取有关所有任务的所有信息)
我研究了AsyncResult和AsyncResult.Backend对象的所有相关方法。似乎唯一相关的是backend.get_task_Meta(task_id),但其中不包含任务名称。 预先感谢
PS:AsyncResult.name始终返回None:
result = AsyncResult(task_id,app=celery_app)
result.name #Returns None
result.args #Also returns None
解决方法
终于找到了答案。
对于任何想知道的人:
您可以通过在celery配置中启用result_extended = True
来解决此问题。
然后:
result = AsyncResult(task_id,app=celery_app)
result.task_name #tasks.add
,
类似下面的(伪代码)应该足够了:
app = Celery("myapp") # add your parameters here
task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
aresult = app.AsyncResult(task_id)
task_name = aresult.name
task_args = aresult.args
print(task_name,task_args)
不幸的是,它不起作用(我会说这是Celery中的错误),因此我们必须找到替代方法。我想到的第一件事是Celery CLI具有inspect query_task
功能,这提示我可以通过使用inspect API查找任务名称,我是对的。这是代码:
# Since the expected way does not work we need to use the inspect API:
insp = app.control.inspect()
task_ids = [task_id]
inspect_result = insp.query_task(*task_ids)
# print(inspect_result)
for node_name in inspect_result:
val = inspect_result[node_name]
if val:
# we found node that executes the task
arr = val[task_id]
state = arr[0]
meta = arr[1]
task_name = meta["name"]
task_args = meta["args"]
print(task_name,task_args)
此方法的问题在于它仅在任务运行时才起作用。完成后,您将无法使用上面的代码。
,从celery.result.AsyncResult的文档来看,这不是很清楚,但是除非您按照configuration docs启用result_extended = True
,否则并非所有属性都已填充:
result_extended
默认:False
启用扩展任务结果属性(名称,args,kwargs,worker,重试,队列,delivery_info)以写入后端。
然后以下将起作用:
result = AsyncResult(task_id)
result.name = 'project.tasks.my_task'
result.args = [2,3]
result.kwargs = {'a': 'b'}
还请注意,rpc://后端不存储此数据,您将需要Redis或类似的东西。如果您使用的是rpc,则即使使用result_extended = True
,也仍然会返回None
。
我在这个code snippet中找到了一个很好的答案。
如果您有 AsyncResult
的实例,则不需要 task_id,您可以简单地执行以下操作:
result # instance of AsyncResult
result_meta = result._get_task_meta()
task_name = result_meta.get("task_name")
当然这依赖于一个私有方法,所以它有点hacky。我希望 celery 引入一种更简单的方法来检索它——它对测试特别有用。