问题描述
我正在编写一种可与sql一起使用的REST api,并且不断发现自己处于这种情况下,在这种情况下,我需要通过查询表联接来返回对象列表以及每个对象内部的嵌套列表。
比方说,我与用户和群组之间存在多对多关系。我有一个用户表和一个组表,以及它们之间的联结表UserGroup。现在,我想编写一个REST端点,该端点返回一个用户列表,并为每个用户返回他们所注册的组。我想返回一个json,其格式如下:
[
{
"username": "test_user1",<other attributes ...>
"groups": [
{
"group_id": 2,<other attributes ...>
},{
"group_id": 3,<other attributes ...>
}
]
},{
"username": "test_user2",<other attributes ...>
"groups": [
{
"group_id": 1,{
"group_id": 2,etc ...
示例(使用python flask_sqlalchemy / flask_restx):
users = db.session.query(User).filter( ... )
for u in users:
groups = db.session.query(Group).join(UserGroup,UserGroup.group_id == Group.id) \
.filter(UserGroup.user.id == u.id)
retobj = api.marshal([{**u.__dict__,'groups': groups} for u in users],my_model)
# Total number of queries: 1 + number of users in result
- 发出恒定数量的SQL查询:这可以通过发出一个整体式SQL查询来执行,该查询执行所有联接,并在“用户”列中添加潜在的大量冗余数据,或者通常更优选一些单独的SQL查询。例如,查询用户列表,然后查询在GroupUsers上联接的Group表,然后在服务器代码中手动对组进行分组。
示例代码:
from collections import defaultdict
users = db.session.query(User).filter( ... )
uids = [u.id for u in users]
groups = db.session.query(User.user_id,Group).join(UserGroup,UserGroup.group_id == Group.id) \
.filter(UserGroup.user_id._in(uids))
aggregate = defaultdict(list)
for g in groups:
aggregate[g.user_id].append(g[1].__dict__)
retobj = api.marshal([{**u.__dict__,'groups': aggregate[u.id]} for u in users],my_model)
# Total number of queries: 2
- 第三种方法的用途有限,是使用string_agg或类似的方法来强制sql将分组连接到一个字符串列中,然后将字符串解压缩到列表服务器端,例如,如果我想要的只是组号我可以使用string_agg和group_by在一次对User表的查询中返回“ 1,2”。但这仅在不需要复杂对象的情况下才有用。
我之所以被第二种方法吸引是因为我感觉它更高效,更可扩展,因为SQL查询的数量(由于没有特别好的原因,我认为这是主要瓶颈)是不变的,但是这需要更多的工作在服务器端将所有组过滤到每个用户中。但是我认为使用sql的部分目的是利用其有效的排序/过滤功能,因此您不必自己做。
所以我的问题是,我是否认为将SQL查询的数量保持不变是一个好主意,而这会花费更多的服务器端处理和开发时间?尝试减少不必要的SQL查询的数量是否浪费时间?如果不进行API大规模测试,是否会后悔?有没有更好的方法可以解决我不知道的问题?
解决方法
使用joinedload
选项,您只需一个查询即可加载所有数据:
SELECT "user".id,"user".name,group_1.id,group_1.name
FROM "user"
LEFT OUTER JOIN (user_group AS user_group_1
JOIN "group" AS group_1 ON group_1.id = user_group_1.group_id)
ON "user".id = user_group_1.user_id
当您运行上面的查询时,所有组将已经使用类似于下面的查询从数据库中加载:
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(
x=[0,1,2],y=[1,1],mode="lines+markers+text",name="Lines,Markers and Text",text=["Text A","Text B","Text C"],textposition="top center"
))
fig.add_trace(go.Scatter(
x=[0,y=[2,2,mode="markers+text",name="Markers and Text",text=["Text D","Text E","Text F"],textposition="bottom center"
))
fig.add_trace(go.Scatter(
x=[0,y=[3,3,3],mode="lines+text",name="Lines and Text",text=["<i class="fa fa-desktop' aria-hidden='true'></i>","Text H","Text I"],textposition="bottom center"
))
fig.show()
现在您只需要使用正确的模式序列化结果即可。