问题描述
我正在使用 kedro.extras.datasets.pandas.sqlTableDataSet 并且想使用 pandas 中的 chunk_size 参数。但是,在运行管道时,该表被视为生成器而不是 pd.dataframe()。
您将如何在管道中使用 chunk_size?
我的目录:
table_name:
type: pandas.sqlTableDataSet
credentials: redshift
table_name : rs_table_name
layer: output
save_args:
if_exists: append
schema: schema.name
chunk_size: 1000
解决方法
查看最新的 pandas
文档,实际使用的 kwarg
是 chunksize
,而不是 chunk_size
。请参阅https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html。
由于 kedro
只包装您的 save_args
并将它们传递给 pd.DataFrame.to_sql
这些需要匹配:
def _save(self,data: pd.DataFrame) -> None:
try:
data.to_sql(**self._save_args)
except ImportError as import_error:
raise _get_missing_module_error(import_error) from import_error
except NoSuchModuleError as exc:
raise _get_sql_alchemy_missing_error() from exc
编辑:一旦你在你的管道中工作,文档显示 pandas.DataFrame.read_sql
与 chunksize
集将返回类型 Iterator[DataFrame]
。这意味着在您的节点函数中,您应该迭代输入(并在适当的情况下进行相应的注释),例如:
def my_node_func(input_dfs: Iterator[pd.DataFrame],*args):
for df in input_dfs:
...
这适用于最新版本的 pandas
。但是,我注意到 pandas
正在对齐 API,以便 read_csv
与 chunksize
集从 ContextManager
返回 pandas>=1.2
,因此我希望此更改能够也出现在 read_sql
中。