celery multi celery.exceptions.TimeoutError:操作超时

问题描述

我跟随官方Celery guide学习,使用celery -A proj worker -l info启动工作程序时一切正常,但是,如果我使用celery multi restart w1 -A proj -l info --logfile=./celery.log在后​​台运行工作程序,则会得到

>>> from proj.tasks import add
>>> add.delay().get(timeout=19)
Traceback (most recent call last):
  File "<stdin>",line 1,in <module>
  File "/usr/local/lib/python3.8/dist-packages/celery/result.py",line 230,in get
    return self.backend.wait_for_pending(
  File "/usr/local/lib/python3.8/dist-packages/celery/backends/base.py",line 660,in wait_for_pending
    Meta = self.wait_for(
  File "/usr/local/lib/python3.8/dist-packages/celery/backends/base.py",line 696,in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
>>>

那怎么了?

测试环境:

  • Python 3.8.2
  • 芹菜多v4.4.7
  • rabbitmqctl版本---> 3.8.2

和项目布局(code files):

proj / init .py /celery.py /tasks.py

celery.py

from __future__ import absolute_import,unicode_literals

from celery import Celery

app = Celery('proj',broker='amqp://localhost',# backend='db+sqlite:///results.sqlite',# The backend argument, If you don’t need results,it’s better to disable them.  Results can also be disabled for individual tasks by setting the @task(ignore_result=True) option.
             include=['proj.tasks'])  # The include argument is a list of modules to import when the worker starts. You need to add our tasks module here so that the worker is able to find our tasks.
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

if __name__ == '__main__':
    app.start()

tasks.py

from __future__ import absolute_import,unicode_literals
from .celery import app
import datetime
@app.task
def add():
    return datetime.datetime.Now().strftime("%Y-%m-%d %H:%M:%s")

以下是我的Google的日志信息,但没有找到有效的解决方

[2020-09-15 22:10:03,229: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-09-15 22:10:03,238: INFO/MainProcess] mingle: searching for neighbors
[2020-09-15 22:10:04,260: INFO/MainProcess] mingle: all alone
[2020-09-15 22:10:04,273: INFO/MainProcess] w1@iZwz962a07bhoio77q4ewxZ ready.
[2020-09-15 22:10:10,625: ERROR/MainProcess] Received unregistered task of type 'proj.tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[],{},{"callbacks": null,"errbacks": null,"chain": null,"chord": null}]' (77b)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/celery/worker/consumer/consumer.py",line 562,in on_task_received
    strategy = strategies[type_]
KeyError: 'proj.tasks.add'


最后,我切换到https://github.com/jarekwg/django-apscheduler 我发现它优雅且易于使用

解决方法

错误日志表明您的相对导入是问题的原因。

这是您task.py的第一行。

from .celery import app

这是您的日志。

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...