使用 Node.js 将大型 CSV 文件插入 Postgres

问题描述

我正在尝试读取一个大型 CSV 文件并使用 Node.js 在 Postgres 中插入该文件。批量读取和插入记录的最佳方式应该是什么。下面是我的代码。我正在尝试将记录分成 1000 个批次。我应该在哪一点初始化 Postgres 数据库并插入。

如果我运行此代码,我将阅读下面的 csv。我被困在如何批量插入值。

Id,City__c
12345678,"Test 1223"
87654321,"Test 8888"
Read entirefile.

js 文件

const fs = require( 'fs' ),es = require( 'event-stream' ),parse = require( "csv-parse" ),iconv = require( 'iconv-lite' );
const pgp = require( 'pg-promise' )( {
    capsql: true
} );

const client = {
    connectionString: 'postgres://ubbmdkfcbfgkfn:89fbe24d9cf50335a0667757d318797cebda6f1d6c2904560a8431aeb9f0040d@ec2-54-75-248-49.eu-west-1.compute.amazonaws.com:5432/ddbimqs3ehvtug',ssl: {
        rejectUnauthorized: false
    }
}

class CSVReader {
    constructor ( filename,batchSize,columns ) {
        this.reader = fs.createReadStream( filename ).pipe( iconv.decodeStream( 'utf8' ) )
        this.batchSize = batchSize || 1000
        this.lineNumber = 0
        this.data = []
        this.parSEOptions = { delimiter: ',',columns: true,escape: '/',relax: true }
    }

    read( callback ) {
        this.reader
            .pipe( es.split() )
            .pipe( es.mapSync( line => {
                ++this.lineNumber

                parse( line,this.parSEOptions,( err,d ) => {
                    console.log( line )
                } )

                if ( this.lineNumber % this.batchSize === 0 )
                {
                    callback( this.data )
                }
            } )
                .on( 'error',function () {
                    console.log( 'Error while reading file.' )
                } )
                .on( 'end',function () {
                    console.log( 'Read entirefile.' )
                } ) )
    }

    continue() {
        this.data = []
        this.reader.resume()
    }
}


let intializeDBAndInsert = () => {
    const db = pgp( client );
    let sco;

    db.connect()
        .then( obj => {
            sco = obj;
            let reader = new CSVReader( 'F:/data.csv' )
            reader.read( () => reader.continue() )
        } )
        .then( data => {
            // success
            console.log( data )
        } )
        .catch( error => {
            // error
            console.log( error )
        } )
        .finally( () => {
            // release the connection,if it was successful:
            if ( sco )
            {
                sco.done();
            }
        } );

}

intializeDBAndInsert();

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)