问题描述
我正在尝试在python中使用psycopg2更新并在包含约3000万条记录的大型postgres表中插入行,我正在批量处理10万条记录(〜花费6分钟的时间),因为我没有这样做。想要打开事务的时间太长,以免创建行锁定,因为在我编写它们时,表行也被其他事务使用。我每次都在循环中打开和关闭连接和游标。
要通过postgres中的游标更新/插入,以下哪一项(或没有一项)最好避免锁定,并获得更好的性能?
1>打开连接以及关闭每批连接。
2>一次打开和关闭连接,但是每次每批打开一次光标。
请告知是否还有更好的选择。目前,我正在使用cursor.execute执行插入/更新查询,但是由于性能不是那么快,我不得不选择批处理。由于我没有足够的权限在插入时删除索引,因此我使用了批处理路径。
使用的查询:-
更新:-
UPDATE target_tbl tgt
set descr = stage.descr,prod_name = stage.prod_name,item_name = stage.item_name,url = stage.url,col1_name = stage.col1_name,col2_name = stage.col2_name,col3_name = stage.col3_name,col4_name = stage.col4_name,col5_name = stage.col5_name,col6_name = stage.col6_name,col7_name = stage.col7_name,col8_name = stage.col8_name,flag = stage.flag
from tbl1 stage
where
tgt.col1 = stage.col1
and tgt.col2 = stage.col2
and coalesce(tgt.col3,'col3'::text) = coalesce(stage.col3,'col3'::text)
and coalesce(tgt.col4,'col4'::text) = coalesce(stage.col4,'col4'::text);
插入:-
Insert into tgt
select
stage.col1,stage.col2,stage.col3,stage.col4
stage.prod_name,stage.item_name,stage.url,stage.col1_name,stage.col2_name,stage.col3_name,stage.col4_name,stage.col5_name,stage.col6_name,stage.col7_name,stage.col8_name,stage.flag
from tbl1 stage
where NOT EXISTS (
select from tgt where
tgt.col1 = stage.col1
and tgt.col2 = stage.col2
and coalesce(tgt.col3,'col4'::text)
) ;
解决方法
无需在 Python 端进行 SQL 编码的 Upsert 函数,只需使用 SQLAlchemy 的强大功能即可。
您也可以决定批量大小,我使用的批量大小为 1000,但您可以尝试更大的。
如果您有一个包含相同列名和类型的字典列表和 SQL 表,请考虑使用此函数。如果您使用的是 DataFrame,您只需执行 df.to_dict('records') 即可获得准备输入的字典列表。
确保您的 Dict 键和 Table 列匹配
from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
from sqlalchemy import create_engine
from typing import List,Dict
engine = create_engine(...)
def upsert_database(list_input: List[Dict],engine: sql_engine,table: str,schema: str) -> None:
if len(list_input) == 0:
return None
with engine.connect() as conn:
base = automap_base()
base.prepare(engine,reflect=True,schema=schema)
target_table = Table(table,base.metadata,autoload=True,autoload_with=engine,schema=schema)
chunks = [list_input[i:i + 1000] for i in range(0,len(list_input),1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
conn.execute(stmt.on_conflict_do_update(
constraint=f'{table}_pkey',set_=update_dict)
)