使用python psycopg2对postgres执行数据更新时,哪个是首选选项?

问题描述

我正在尝试在python中使用psycopg2更新并在包含约3000万条记录的大型postgres表中插入行,我正在批量处理10万条记录(〜花费6分钟的时间),因为我没有这样做。想要打开事务的时间太长,以免创建行锁定,因为在我编写它们时,表行也被其他事务使用。我每次都在循环中打开和关闭连接和游标。

要通过postgres中的游标更新/插入,以下哪一项(或没有一项)最好避免锁定,并获得更好的性能

1>打开连接以及关闭每批连接。

  • 打开该批次的连接。

  • 打开该批次的光标。

  • 提交该批次的交易。

  • 关闭该批次的光标。

  • 关闭该批次的连接。

2>一次打开​​和关闭连接,但是每次每批打开一次光标。

  • 打开一次连接。

  • 打开该批次的光标。

  • 提交该批次的交易。

  • 关闭该批次的光标。

  • 在最后一批之后关闭连接。

请告知是否还有更好的选择。目前,我正在使用cursor.execute执行插入/更新查询,但是由于性能不是那么快,我不得不选择批处理。由于我没有足够的权限在插入时删除索引,因此我使用了批处理路径。

使用的查询:-

更新:-

UPDATE target_tbl tgt
        set descr = stage.descr,prod_name = stage.prod_name,item_name = stage.item_name,url       = stage.url,col1_name = stage.col1_name,col2_name = stage.col2_name,col3_name = stage.col3_name,col4_name = stage.col4_name,col5_name = stage.col5_name,col6_name = stage.col6_name,col7_name = stage.col7_name,col8_name = stage.col8_name,flag      = stage.flag
    from tbl1 stage
    where 
    tgt.col1 = stage.col1
    and tgt.col2 = stage.col2
    and coalesce(tgt.col3,'col3'::text) = coalesce(stage.col3,'col3'::text)
    and coalesce(tgt.col4,'col4'::text) = coalesce(stage.col4,'col4'::text);
        

插入:-

 Insert into tgt
    select 
    stage.col1,stage.col2,stage.col3,stage.col4
    stage.prod_name,stage.item_name,stage.url,stage.col1_name,stage.col2_name,stage.col3_name,stage.col4_name,stage.col5_name,stage.col6_name,stage.col7_name,stage.col8_name,stage.flag
    from tbl1 stage
    where NOT EXISTS (
    select from tgt where
    tgt.col1 = stage.col1
    and tgt.col2 = stage.col2
    and coalesce(tgt.col3,'col4'::text)
    ) ;

解决方法

无需在 Python 端进行 SQL 编码的 Upsert 函数,只需使用 SQLAlchemy 的强大功能即可。

您也可以决定批量大小,我使用的批量大小为 1000,但您可以尝试更大的。

如果您有一个包含相同列名和类型的字典列表和 SQL 表,请考虑使用此函数。如果您使用的是 DataFrame,您只需执行 df.to_dict('records') 即可获得准备输入的字典列表。

确保您的 Dict 键和 Table 列匹配

from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
from sqlalchemy import create_engine
from typing import List,Dict

engine = create_engine(...)


def upsert_database(list_input: List[Dict],engine: sql_engine,table: str,schema: str) -> None:
    if len(list_input) == 0:
        return None
    with engine.connect() as conn:
        base = automap_base()
        base.prepare(engine,reflect=True,schema=schema)
        target_table = Table(table,base.metadata,autoload=True,autoload_with=engine,schema=schema)
        chunks = [list_input[i:i + 1000] for i in range(0,len(list_input),1000)]
        for chunk in chunks:
            stmt = insert(target_table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            conn.execute(stmt.on_conflict_do_update(
                constraint=f'{table}_pkey',set_=update_dict)
            )