问题描述
这是我在 fastapi 项目中调用 celery 任务的路径
@router.get("/")
def read_bhav(
db: Session = Depends(deps.get_db),skip: int = 0,limit: int = 100,current_user: models.User = Depends(deps.get_current_active_user),) -> Any:
"""
Retrieve bhavs.
"""
y,d,m = (datetime.Now().year,datetime.Now().day - 3,datetime.Now().month)
if date(y,m,d).weekday() < 5:
# celery_app.send_task("app.worker.bhav_copy",(date(y,d),db))
bhav_copy.delay(date(y,db)
print(db)
return {'TEST': "Successfully added"}
else:
return {"FAIL": "No Stock data for Weekends"}
这是我的 celery_app.py 文件,我正在配置我的 celery 应用程序
from celery import Celery
celery_app = Celery("worker",broker="amqp://guest@queue//")
celery_app.conf["CELERY_TASK_SERIALIZER"] = "pickle"
celery_app.conf["CELERY_ACCEPT_CONTENT"] = ["pickle"]
celery_app.conf["CELERY_RESULT_SERIALIZER"] = "pickle"
celery_app.conf['CELERY_ROUTES'] = {"app.worker.test_celery": "main-queue"}
celery_app.conf['CELERY_ROUTES'] = {"app.worker.bhav_copy": "main-queue"}
你可以看到我尝试了多种方法将其序列化为 json 或类似的东西
@celery_app.task(acks_late=True)
def bhav_copy(date: date,db) -> str:
prices = get_price_list(dt=date)
prices.columns = map(str.lower,prices.columns)
price_in = []
for i in range(1,prices.shape[0] + 1):
price = prices[i - 1:i]
v = price.values.reshape(13)
price_in.append(dict(zip(price.columns,v)))
crud.bhav.insert_bulk(db=db,bhav_in=price_in)
return f"inserted stocks data"
非常感谢能够解决此问题的解决方案,目前使用此代码我从下面收到以下错误
backend_1 | File "/usr/local/lib/python3.7/site-packages/kombu/serialization.py",line 350,in pickle_dumps
backend_1 | return dumper(obj,protocol=pickle_protocol)
backend_1 | kombu.exceptions.EncodeError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)