问题描述
我正在尝试使用from_orm方法将结果字典从sqlalchemy自动获取到Fastapi的Pydantic输出中,以使用from_orm方法映射到地图,但是我总是遇到验证错误。
pydantic.main.BaseModel.from_orm中的文件“ pydantic \ main.py”,第508行 pydantic.error_wrappers.ValidationError:2个类别的验证错误 名称 必填字段(类型= value_error.missing) ID 必填字段(类型= value_error.missing)
如果我自己使用Pydantic模式创建对象并将其添加到列表中,则该方法有效。 为了from_orm工作,我需要更改什么? 我可能会错过文档中的某些内容吗? https://pydantic-docs.helpmanual.io/usage/models/#orm-mode-aka-arbitrary-class-instances https://fastapi.tiangolo.com/tutorial/sql-databases/#use-pydantics-orm_mode
或者还有另一种/更好的方法可以将ResultProxy转换为具有Pydantic功能的输出?
我从数据库方法获得的输出如下:
[{'id': 1,'name': 'games','parentid': None},{'id': 2,'name': 'computer',{'id': 3,'name': 'household',{'id': 10,'name': 'test','parentid': None}]]
Models.py
from sqlalchemy import BigInteger,Column,DateTime,ForeignKey,Integer,Numeric,String,Text,text,Table
from sqlalchemy.orm import relationship,mapper
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
category = Table('category',metadata,Column('id',primary_key=True),Column('name',String(200)),Column('parentid',Integer),)
class Category(object):
def __init__(self,cat_id,name,parentid):
self.id = cat_id
self.name = name
self.parentid = parentid
mapper(Category,category)
Schemas.py
from pydantic import BaseModel,Field
class Category(BaseModel):
name: str
parentid: int = None
id: int
class Config:
orm_mode = True
main.py
def result_proxy_to_Dict(results: ResultProxy):
d,a = {},[]
for rowproxy in results:
# rowproxy.items() returns an array like [(key0,value0),(key1,value1)]
for column,value in rowproxy.items():
# build up the dictionary
d = {**d,**{column: value}}
a.append(d)
return a
def crud_read_cat(db: Session) -> dict:
# records = db.query(models.Category).all()
#query = db.query(models.Category).filter(models.Category.parentid == None)
s = select([models.Category]). \
where(models.Category.parentid == None)
result = db.execute(s)
#print(type(result))
#print(result_proxy_to_Dict(result))
#results = db.execute(query)
# result_set = db.execute("SELECT id,parentid FROM public.category;")
# rint(type(result_set))
# for r in result_set:
# print(r)
# return [{column: value for column,value in rowproxy.items()} for rowproxy in result_set]
# return await databasehelper.database.fetch_all(query)
return result_proxy_to_Dict(result)
#return results
@router.get("/category/",response_model=List[schemas.Category],tags=["category"])
async def read_all_category(db: Session = Depends(get_db)):
categories = crud_read_cat(db)
context = []
print(categories)
co_model = schemas.Category.from_orm(categories)
# print(co_model)
for row in categories:
print(row)
print(row.get("id",None))
print(row.get("name",None))
print(row.get("parentid",None))
tempcat = schemas.Category(id=row.get("id",None),name=row.get("name",parentid=row.get("parentid",None))
context.append(tempcat)
#for dic in [dict(r) for r in categories]:
# print(dic)
# print(dic.get("category_id",None))
# print(dic.get("category_name",None))
# print(dic.get("category_parentid",None))
# tempcat = schemas.Category(id=dic.get("category_id",name=dic.get("category_name",# parentid=dic.get("category_parentid",None))
# context.append(tempcat)
return context
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)