从PostgreSQL提取大数据块到Pandas DataFrames

问题描述

我正在使用psycopg2pandas从Postgres中提取数据。

pandas.read_sql_query在提供chunksize参数时支持Python“生成器”模式。在处理大型数据集时,它并不是很有用,因为最初将整个数据从数据库中检索到客户端内存中,然后根据chunksize分成单独的帧。使用这种方法,大型数据集将很容易遇到内存不足的问题。

Postgres / psycopg2正在使用server-side cursors解决此问题。但是熊猫似乎并没有支持它。

代替:

iter = sql.read_sql_query(sql,conn,index_col='col1',chunksize=chunksize)

我尝试过像这样重新实现它:

from pandas.io.sql import sqliteDatabase

curs = conn.cursor(name='cur_name') # server side cursor creation
curs.itersize = chunksize

pandas_sql = sqliteDatabase(curs,is_cursor=True)
iter = pandas_sql.read_query(
      sql,chunksize=chunksize)

但是它失败了,因为Pandas试图访问cursor.description,由于服务器端游标的某种原因,它为NULL(并知道为什么)。

最好的方法是什么? Tnx

P.S。

解决方法

您需要重写熊猫的read_query(),以免使用cursor.description。只需将列名称列表传递给read_query()即可使用它而不是cursor.description`:

import psycopg2
from pandas.io.sql import SQLiteDatabase,_convert_params

# modify read_query as you need and overwrite it
# added column names as argument
def read_query_modified(
    self,sql,columns,index_col=None,coerce_float=True,params=None,parse_dates=None,chunksize=None,):

    args = _convert_params(sql,params)
    cursor = self.execute(*args)
    # columns = [col_desc[0] for col_desc in cursor.description]

    if chunksize is not None:
        return self._query_iterator(
            cursor,chunksize,index_col=index_col,coerce_float=coerce_float,parse_dates=parse_dates,)
    else:
        data = self._fetchall_as_list(cursor)
        cursor.close()

        frame = _wrap_result(
            data,)
        return frame

# replace read_query with your version
SQLiteDatabase.read_query = read_query_modified

chunksize = 2
conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")
curs = conn.cursor(name='cur_name')
curs.itersize = chunksize

sql = 'select * from users where id = 366196'
columns = ['id','firstname','lastname','birth','gender','nationality']

pandas_sql = SQLiteDatabase(curs,is_cursor=True)
iter = pandas_sql.read_query(
      sql,index_col='id',chunksize=chunksize)

for x in iter:
    print(x)

输出:

       firstname    lastname birth gender nationality
id
366196   Michael  Kronberger  None   None          at
,

我对@Maurice Meyer的回答做了一些改进,以防止需要将columns作为参数传递:

class CustomSQLiteDatabase(SQLiteDatabase):

    def read_query(
        self,):

        args = _convert_params(sql,params)
        self.con.execute(*args)

        if chunksize is not None:

            columns = None

            while True:
                data = self.con.fetchmany(chunksize)

                if not columns:
                    columns = [col_desc[0] for col_desc in self.con.description]

                if type(data) == tuple:
                    data = list(data)
                if not data:
                    self.con.close()
                    break
                else:
                    yield _wrap_result(
                        data,)

        else:
            data = self._fetchall_as_list(self.con)
            columns = [col_desc[0] for col_desc in self.con.description]
            self.con.close()

            frame = _wrap_result(
                data,)
            return frame