在 Flask 应用程序中运行 APScheduler 作为后台函数而不干扰其他线程

问题描述

下面是我的 init.py 函数,我想在其中安排一个后台任务在每周一上午 10:30 运行。

代码没有显示错误,但也不起作用。

from flask import Flask,render_template
from flask_sqlalchemy import sqlAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlite3 import Connection as sqlite3Connection
from apscheduler.schedulers.background import BackgroundScheduler
from Scripts.File_merge import File_merge_auto

db = sqlAlchemy()
migrate = Migrate()
login_manager = LoginManager()

sched = BackgroundScheduler(daemon=True)
sched.add_job(File_merge_auto.sirmerge(),'cron',day_of_week='mon',hour=10,minute=30)
sched.start()

def create_app():
    """Application-factory pattern"""
    # Initialize our core application
    app = Flask(__name__)
    if app.config['ENV'] == 'development':
        app.config.from_object('config.DevelopmentConfig')
    else:
        app.config.from_object('config.ProductionConfig')

    # Error Handling
    @app.errorhandler(404)
    def page_not_found(error):
        return render_template('error_pages/404.html'),404

    @app.errorhandler(500)
    def internal_error(error):
        return render_template('error_pages/500.html'),500

    @app.errorhandler(400)
    def bad_request(error):
        return render_template('error_pages/400.html'),400

    @app.errorhandler(401)
    def unauthorized(error):
        return render_template('error_pages/401.html'),401

    @app.errorhandler(403)
    def forbidden(error):
        return render_template('error_pages/403.html'),403

    @app.errorhandler(405)
    def method_not_allowed(error):
        return render_template('error_pages/405.html'),405

    @app.errorhandler(503)
    def service_unavailable(error):
        return render_template('error_pages/503.html'),503

    

    with app.app_context():
        # Import Routes
        from .views.login import login_bp
        from .views.home import home_bp
        from .views.admin import admin_bp
        from .views.flash_dashboard import flash_dashboard_bp
        from .views.blank_dashboard1 import blank_dashboard1_bp
        from .views.blank_dashboard2 import blank_dashboard2_bp
        
        # Register Blueprints
        app.register_blueprint(login_bp)
        app.register_blueprint(home_bp)
        app.register_blueprint(admin_bp)
        app.register_blueprint(flash_dashboard_bp)
        app.register_blueprint(blank_dashboard1_bp)
        app.register_blueprint(blank_dashboard2_bp)
        
        # Initialize extensions
        db.init_app(app)
        migrate.init_app(app,db)
        login_manager.init_app(app)
        
        return app

谁能告诉我我哪里做错了?

我基本上需要运行一个后台任务,以便合并驱动器中的几个文件,然后在交互时根据需要由我的其他应用程序使用。

解决方法

您正在调用 File_merge_auto.sirmerge() 并将其返回值传递给 sched.add_job()。因此,您的代码等效于:

value = File_merge_auto.sirmerge()
sched.add_job(value,'cron',day_of_week='mon',hour=10,minute=30)

这可能不是您想要的,因此您需要将函数(而不是其返回值)传递给 add_job()

sched.add_job(File_merge_auto.sirmerge,minute=30)

如果您有任何其他问题,请参阅文档的 troubleshooting section

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...