问题描述
我正在使用 Kedro 管理数据管道,在最后一步,我在 S3 存储桶中存储了一个巨大的 csv 文件,我需要将其加载回 sql Server。
我通常会使用 bulk insert 来解决这个问题,但不太确定如何将其放入 kedro 模板中。这是 catalog.yml
flp_test:
type: pandas.sqlTableDataSet
credentials: dw_dev_credentials
table_name: flp_tst
load_args:
schema: 'dwschema'
save_args:
schema: 'dwschema'
if_exists: 'replace'
bulk_insert_input:
type: pandas.CSVDataSet
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
credentials: dev_s3
def insert_data(self,conn,csv_file_nm,db_table_nm):
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
# Execute the query
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
- 如何将
csv_file_nm
指向我的bulk_insert_input
S3 目录? - 是否有正确的方法间接访问
dw_dev_credentials
来执行插入操作?
解决方法
Kedro 的 pandas.SQLTableDataSet.html 照原样使用 pandas.to_sql 方法。要按原样使用它,您需要一个 pandas.CSVDataSet
到 node
然后写入目标 pandas.SQLDataTable
数据集以便将其写入 SQL。如果你有 Spark 可用,这将比 Pandas 更快。
为了使用内置的 BULK INSERT
查询,我认为您需要定义一个 custom dataset。