如何在fastapi中将db:Session对象传递给celery任务

问题描述

这是我在 fastapi 项目中调用 celery 任务的路径

@router.get("/")
    def read_bhav(
    db: Session = Depends(deps.get_db),skip: int = 0,limit: int = 100,current_user: models.User = Depends(deps.get_current_active_user),) -> Any:

    """
    Retrieve bhavs.
    """
    y,d,m = (datetime.Now().year,datetime.Now().day - 3,datetime.Now().month)
    if date(y,m,d).weekday() < 5:
        # celery_app.send_task("app.worker.bhav_copy",(date(y,d),db))
        bhav_copy.delay(date(y,db)
        print(db)
        return {'TEST': "Successfully added"}
    else:
        return {"FAIL": "No Stock data for Weekends"}

这是我的 celery_app.py 文件,我正在配置我的 celery 应用程序

from celery import Celery

celery_app = Celery("worker",broker="amqp://guest@queue//")

celery_app.conf["CELERY_TASK_SERIALIZER"] = "pickle" 
celery_app.conf["CELERY_ACCEPT_CONTENT"] = ["pickle"]  
celery_app.conf["CELERY_RESULT_SERIALIZER"] = "pickle"

celery_app.conf['CELERY_ROUTES'] = {"app.worker.test_celery": "main-queue"}
celery_app.conf['CELERY_ROUTES'] = {"app.worker.bhav_copy": "main-queue"}

你可以看到我尝试了多种方法将其序列化为 json 或类似的东西

@celery_app.task(acks_late=True)
def bhav_copy(date: date,db) -> str:
    prices = get_price_list(dt=date)
    prices.columns = map(str.lower,prices.columns)

    price_in = []
    for i in range(1,prices.shape[0] + 1):
        price = prices[i - 1:i]
        v = price.values.reshape(13)
        price_in.append(dict(zip(price.columns,v)))
    crud.bhav.insert_bulk(db=db,bhav_in=price_in)
    return f"inserted stocks data"

非常感谢能够解决此问题的解决方案,目前使用此代码我从下面收到以下错误

backend_1       |   File "/usr/local/lib/python3.7/site-packages/kombu/serialization.py",line 350,in pickle_dumps
backend_1       |     return dumper(obj,protocol=pickle_protocol)
backend_1       | kombu.exceptions.EncodeError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)