芹菜任务中的套接字发射时,芹菜和Flask_socketio消息丢失

问题描述

我遇到的问题是我想向前端发送一条消息,但是该消息从celery任务和主线程中丢失了。打印语句仅用于检查任务是否正在运行。调用celery任务是来自uploadDatafile。

def make_celery(app):
    celery = Celery(
        app.name,backend=app.config['CELERY_RESULT_BACKEND'],broker=app.config['CELERY_broKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self,*args,**kwargs):
            with app.app_context():
                return self.run(*args,**kwargs)

    celery.Task = ContextTask
    return celery


app.config.update(
    CELERY_broKER_URL = 'amqp://',CELERY_RESULT_BACKEND='amqp://'
)

socketio = SocketIO(app,cors_allowed_origins="*",message_queue='amqp://')
CORS(app)

@celery.task(name='app.add_together')
def add_together():
    socketio = SocketIO(message_queue='amqp://')    
    print(50*'*')
    time.sleep(5)       
    socketio.emit('traindata',{'data':'12','time': '14'},namespace='/test') 
    print(50*'*')

@app.route("/uploadDataFile",methods=["GET","POST"])
def upload_file():
    print(celery.tasks.keys())
    if request.method == "POST":
        # check if the post request has the file part
        if "file" not in request.files:
            response = make_response("No File Attached",404)
            return response
        file = request.files["file"]
        # if user does not select file,browser also
        # submit an empty part without filename
        if file.filename == "":
            response = make_response("No File Selected",404)
            return response
        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            file.save(os.path.join(app.config["UPLOAD_FOLDER"],filename))
            add_together.apply_async()
            response = make_response("Uploaded File".format(filename))
            return response
        else:
            response = make_response(
                "Allowed file types ({}) only".format(".ndjson or .json"),404
            )
            return response
    return ""
if __name__ == "__main__":    
    sess = Session()
    sess.init_app(app)             
    socketio.run(app,port=5000,debug=True)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)