Celery Beat 确认一个任务,运行它但不执行它

问题描述

问题是我得到以下日志:

celery_1  | [2021-03-15 19:00:00,124: INFO/MainProcess] Scheduler: Sending due task read_dof (api.tasks.read_dof)
celery_1  | [2021-03-15 19:00:00,140: INFO/MainProcess] Scheduler: Sending due task read_bdm (api.tasks.read_bdm)
celery_1  | [2021-03-15 19:00:00,141: INFO/MainProcess] Scheduler: Sending due task read_fixer (api.tasks.read_fixer)

我对 celery 有以下配置。 Exchange 是我的 django 项目的名称,即“celery.py”所在的位置,而 api 是我的 django 应用程序的名称,即我的“tasks.py”所在的位置:

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault("DJANGO_SETTINGS_MODULE","exchange.settings")

app = Celery("exchange")

app.config_from_object("django.conf:settings",namespace="CELERY")

app.autodiscover_tasks()

app.conf.beat_schedule = {
    'read_bdm': {
        'task': 'api.tasks.read_bdm','schedule': crontab(hour=19,minute=0),},'read_dof': {
        'task': 'api.tasks.read_dof','read_fixer': {
        'task': 'api.tasks.read_fixer',}

这是我的tasks.py:

from celery import shared_task
from .models import BdmExch,DofExch,FixerExch
from .helpers.bdmcrawler import parse_bdm
from .helpers.dofcrawler import parse_dof
from .helpers.fixercrawler import parse_fixer

@shared_task(name='read_bdm')
def read_bdm():
    attempts=0
    while attempts <3:
        try:
            result = parse_bdm()
            print(result)
            BdmExch.objects.create(time=result["date"],exch=result["exc"])
            return
        except:
            attempts += 1
            print("Parsing error on read_bdm")
    print("--------------- Parsing error on read_bdm -----------")
    return    

@shared_task(name='read_dof')
def read_dof():
    attempts=0
    while attempts < 3:
        try:
            result = parse_dof()
            DofExch.objects.create(time=result["date"],exch=result["exc"])
            return
        except:
            attempts += 1
            print("Parsing error on read_dof")

    print("--------------- Parsing error on read_dof -----------")
    return

@shared_task(name='read_fixer')
def read_fixer():
    attempts=0
    while attempts < 3:
        try:
            result = parse_bdm()
            FixerExch.objects.create(time=result["date"],exch=result["exc"])
            return
        except:
            attempts += 1
            print("Parsing error on read_fixer")
    print("--------------- Parsing error on read_fixer -----------")
    return

正如我在 api django 应用程序中所说的,parse_bdm、parse_dof 和 parse_fixer 函数是请求和 beautifulsoup 或简单字典的简单实现,以便从不同来源读取数据。当我简单地运行任务函数时不会出现问题,就好像它们是简单的函数一样,所以这让我相信我的 celery.py 中存在我似乎无法确定的问题。

非常感谢任何帮助。

非常感谢!

解决方法

您是否尝试过为 Django 应用程序添加 init 文件?

exchange/__init__.py

from __future__ import absolute_import,unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

据我所知,文档有时会产生误导:

@shared_task 装饰器让你无需任何 具体的应用实例:

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...