注册芹菜定期任务以在django管理员中查看它 celery.py settings.py tasks.py

问题描述

我正在尝试在Django和Celery中编写一些定期后台任务

我的任务工作正常,当我用Beat Scheduler启动我的芹菜工人时,我设法定期启动这些任务:
celery -A parifex worker -B -l info
Discovered tasks by the worker

如上图所示,我的任务很好地发现。但是,在IN​​STALLED_APPS上的自动发现无法正常工作,因此,我不得不包括要在 strong>。

要求

pip freeze |grep "celery\|Django"
celery==4.4.1
Django==3.1.2
django-celery==3.3.1

进入设置所在的基本应用程序:

celery.py

from __future__ import absolute_import,unicode_literals

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','parifex.settings')

# Define the Celery configuration and include <app>.tasks to discover it
app = Celery('parifex',include=['device.tasks'])

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS,force=True)


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

settings.py

djcelery.setup_loader()
CELERYD_HIJACK_ROOT_LOGGER = False

# http://celery.readthedocs.org/en/latest/configuration.html#celery-redirect-stdouts-level
CELERY_REDIRECT_STDOUTS = True  # par défaut
CELERY_REDIRECT_STDOUTS_LEVEL = 'DEBUG'

CELERY_broKER_URL = 'redis://:Redi$DB_withCelery@127.0.0.1:6234'
# store schedule in the DB:
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_RESULT_BACKEND = 'redis://:Redi$DB_withCelery@127.0.0.1:6234'

CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']  # Ignore other content
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True

CELERY_TRACK_STARTED = True
CELERYD_POOL_RESTARTS = True

进入名为Device的模块应用

tasks.py

from celery import shared_task
from celery.schedules import crontab
from celery.task import periodic_task
from django.core.serializers import serialize
from djcelery.schedulers import DatabaseScheduler,ModelEntry

from .models import Device
from .api.tools import TestDeviceApi


@shared_task
def count_devices():
    return Device.objects.count()


@shared_task(serializer='json',bind=True)
def get_devices_state():
    device_status = {}
    print(serialize('json',Device.objects.all()))
    for device in Device.objects.all():
        status = TestDeviceApi(device).ping()
        device_status[device.id] = (serialize('json',[device]),status)
    return device_status


@periodic_task(run_every=(crontab(minute='*/1')))
def print_random():
    print(15)

好吧,正如我所说,我的工作人员发现任务很好,但是我在Django中的管理页面看不到它。

一个问题::在按页面创建定期任务时,如何注册我的任务以便在管理页面中看到它?还是没用? Registered Tasks my admin page can see

第二个问题:是否可以在管理页面中看到硬编码的定期任务而无需安装任何其他模块?是否有“内置”方式来做到这一点?或者,也许是每次调用任务时我都必须尝试自己保存它吗?

我的目标是让用户能够自行更改具有认行为的任务的计划。

在此先感谢您的帮助!我希望我很清楚。

解决方法

我遇到了同样的问题。

我通过将这些行添加到 proj/proj/__init__.py 来修复它

from __future__ import absolute_import,unicode_literals

from .celery import app as celery_app

__all__ = ('celery_app',)

不确定这是最好的解决方案还是只是一个变通办法,但显然这将确保在 Django 启动以使用 shared_task 时导入应用程序。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...