celery: RuntimeError: RPC 后端缺少 task_id 的任务请求

问题描述

我试图跟踪任务何时开始,但 task_track_started 对我不起作用。我尝试了 this post here 的答案,但遇到了错误我有以下代码来更新任务启动后的状态。这是在一个 django 模块中,然后该模块作为应用程序安装在另一个运行它的 django 项目中。

from celery import current_app
from celery.signals import after_task_publish

@after_task_publish.connect
def update_sent_state(sender=None,headers=None,**kwargs):

    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend

    backend.store_result(headers['id'],None,"SENT")

信号被触发,但在最后一行出现以下错误

Traceback (most recent call last):
  File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py",line 175,in destination_for
    request = request or current_task.request
  File "/home/dd_env/lib/python3.8/site-packages/celery/local.py",line 143,in __getattr__
    return getattr(self._get_current_object(),name)
AttributeError: 'nonetype' object has no attribute 'request'

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/home/dd_env/lib/python3.8/site-packages/celery/utils/dispatch/signal.py",line 276,in send
    response = receiver(signal=self,sender=sender,**named)
  File "/home/config_module/spinner/signals.py",line 319,in update_sent_state
    backend.store_result(headers['id'],"SENT")
  File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py",line 198,in store_result
    routing_key,correlation_id = self.destination_for(task_id,request)
  File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py",line 177,in destination_for
    raise RuntimeError(
RuntimeError: RPC backend missing task request for '4cb351b0-3643-4f53-a238-bad84c18042d'

在信号内部,调用 backend.get_state(headers['id'])backend.get_result(headers['id'])方法会返回预期的输出。任务正在成功执行并返回结果,但我无法设置它的状态。 backend.mark_as_started(headers['id']) 也会返回相同的错误

这是我的任务定义的样子:

from celery import shared_task

@shared_task
def update_keywords_task(pk: int):
    <Random CRUD operations>

这是我的芹菜设置:

app = Celery('<app_name>',backend='rpc://',broker='pyamqp://')
app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks()
app.conf.broker_transport_options = {
    'max_retries': 3,'interval_start': 0,'interval_step': 0.2,'interval_max': 0.2,}

为什么找不到我的任务请求?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...