如何在 Kedro 节点中使用 SQL Server 批量插入?

问题描述

我正在使用 Kedro 管理数据管道,在最后一步,我在 S3 存储桶中存储了一个巨大的 csv 文件,我需要将其加载回 sql Server。

我通常会使用 bulk insert解决这个问题,但不太确定如何将其放入 kedro 模板中。这是 catalog.yml

中配置的目标表和 S3 存储桶
flp_test:
  type: pandas.sqlTableDataSet
  credentials: dw_dev_credentials
  table_name: flp_tst
  load_args:
    schema: 'dwschema'
  save_args:
    schema: 'dwschema'
    if_exists: 'replace'

bulk_insert_input:
   type: pandas.CSVDataSet
   filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
   credentials: dev_s3


def insert_data(self,conn,csv_file_nm,db_table_nm):
    qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
    # Execute the query
    cursor = conn.cursor()
    success = cursor.execute(qry)
    conn.commit()
    cursor.close
  • 如何将 csv_file_nm 指向我的 bulk_insert_input S3 目录?
  • 是否有正确的方法间接访问 dw_dev_credentials 来执行插入操作?

解决方法

Kedro 的 pandas.SQLTableDataSet.html 照原样使用 pandas.to_sql 方法。要按原样使用它,您需要一个 pandas.CSVDataSetnode 然后写入目标 pandas.SQLDataTable 数据集以便将其写入 SQL。如果你有 Spark 可用,这将比 Pandas 更快。

为了使用内置的 BULK INSERT 查询,我认为您需要定义一个 custom dataset