问题描述
我跟随官方Celery guide学习,使用celery -A proj worker -l info
启动工作程序时一切正常,但是,如果我使用celery multi restart w1 -A proj -l info --logfile=./celery.log
在后台运行工作程序,则会得到
>>> from proj.tasks import add
>>> add.delay().get(timeout=19)
Traceback (most recent call last):
File "<stdin>",line 1,in <module>
File "/usr/local/lib/python3.8/dist-packages/celery/result.py",line 230,in get
return self.backend.wait_for_pending(
File "/usr/local/lib/python3.8/dist-packages/celery/backends/base.py",line 660,in wait_for_pending
Meta = self.wait_for(
File "/usr/local/lib/python3.8/dist-packages/celery/backends/base.py",line 696,in wait_for
raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
>>>
那怎么了?
测试环境:
- Python 3.8.2
- 芹菜多v4.4.7
- rabbitmqctl版本---> 3.8.2
和项目布局(code files):
proj / init .py /celery.py /tasks.py
celery.py
from __future__ import absolute_import,unicode_literals
from celery import Celery
app = Celery('proj',broker='amqp://localhost',# backend='db+sqlite:///results.sqlite',# The backend argument, If you don’t need results,it’s better to disable them. Results can also be disabled for individual tasks by setting the @task(ignore_result=True) option.
include=['proj.tasks']) # The include argument is a list of modules to import when the worker starts. You need to add our tasks module here so that the worker is able to find our tasks.
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
if __name__ == '__main__':
app.start()
tasks.py
from __future__ import absolute_import,unicode_literals
from .celery import app
import datetime
@app.task
def add():
return datetime.datetime.Now().strftime("%Y-%m-%d %H:%M:%s")
以下是我的Google的日志信息,但没有找到有效的解决方案
[2020-09-15 22:10:03,229: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-09-15 22:10:03,238: INFO/MainProcess] mingle: searching for neighbors
[2020-09-15 22:10:04,260: INFO/MainProcess] mingle: all alone
[2020-09-15 22:10:04,273: INFO/MainProcess] w1@iZwz962a07bhoio77q4ewxZ ready.
[2020-09-15 22:10:10,625: ERROR/MainProcess] Received unregistered task of type 'proj.tasks.add'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[],{},{"callbacks": null,"errbacks": null,"chain": null,"chord": null}]' (77b)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/celery/worker/consumer/consumer.py",line 562,in on_task_received
strategy = strategies[type_]
KeyError: 'proj.tasks.add'
最后,我切换到https://github.com/jarekwg/django-apscheduler 我发现它优雅且易于使用
解决方法
错误日志表明您的相对导入是问题的原因。
这是您task.py的第一行。
from .celery import app
这是您的日志。
Did you remember to import the module containing this task?
Or maybe you're using relative imports?