如何在kedro管道中为kedro.extras.datasets.pandas.SQLTableDataSet使用Chunk Size?

问题描述

我正在使用 kedro.extras.datasets.pandas.sqlTableDataSet 并且想使用 pandas 中的 chunk_size 参数。但是,在运行管道时,该表被视为生成器而不是 pd.dataframe()。

您将如何在管道中使用 chunk_size?

我的目录:

table_name:
  type: pandas.sqlTableDataSet
  credentials: redshift
  table_name : rs_table_name
  layer: output
  save_args:
    if_exists: append
    schema: schema.name
    chunk_size: 1000

解决方法

查看最新的 pandas 文档,实际使用的 kwargchunksize,而不是 chunk_size。请参阅https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html。 由于 kedro 只包装您的 save_args 并将它们传递给 pd.DataFrame.to_sql 这些需要匹配:

def _save(self,data: pd.DataFrame) -> None:
    try:
        data.to_sql(**self._save_args)
    except ImportError as import_error:
        raise _get_missing_module_error(import_error) from import_error
    except NoSuchModuleError as exc:
        raise _get_sql_alchemy_missing_error() from exc

编辑:一旦你在你的管道中工作,文档显示 pandas.DataFrame.read_sqlchunksize 集将返回类型 Iterator[DataFrame]。这意味着在您的节点函数中,您应该迭代输入(并在适当的情况下进行相应的注释),例如:

def my_node_func(input_dfs: Iterator[pd.DataFrame],*args):
  for df in input_dfs:
    ...

这适用于最新版本的 pandas。但是,我注意到 pandas 正在对齐 API,以便 read_csvchunksize 集从 ContextManager 返回 pandas>=1.2,因此我希望此更改能够也出现在 read_sql 中。