在node.js中实现“分批”可写流

问题描述

这是一些代码

const fs = require('fs');
const stream = require('stream');
const db = require('some-db-engine');

const readStream = fs.createReadStream('file.dat');
const transformStream = new stream.Transform({
    objectMode: true,highWaterMark: 1000,transform(chunk,encoding,callback) {
        const obj = { /* somehow transform input chunk to object */ };
        this.push(obj);
        callback();
    }
});
const writableStream = new stream.Writable({
    objectMode: true,writev(chunks,callback) {
        /* chunks should be a buffered array of objects no more than 1000 items */
        db.query('dump all chunks () in one move',callback);
    }
});

stream.pipeline(
    readStream,transformStream,writableStream,(error) => {
        if (error) {
            console.error(error);
        } else {
            console.log('Success!');
        }
    }
);

我想要的是writableStream临时将传入的对象按1000个项目分批缓冲,因此数据一次被大部分写入db。相反,writev方法始终接收仅一个元素的数组。
当然,我可以实现自己的内部缓冲区,只是想知道我是否丢失了某些东西,以及是否可以使用本机api:highWaterMarkwritable.cork()或其他API来实现?

更新

到目前为止,我一直在等待解决方案:

const createChunkedStream = (size) => {

    const queue = [];
    
    const process = (stream,end,chunk,callback) => {
        end || queue.push(chunk);
        if (queue.length >= (end ? 1 : size)) {
            stream.push(queue.splice(0));
        }
        callback();
    };

    return stream.Transform({
        transform(chunk,callback) {
            process(this,false,callback);
        },flush(callback) {
            process(this,true,null,callback);
        }
    });

};

可以用作

stream.pipeline(
    readStream,createChunkedStream(1000),(error) => { /* ... */ }
);

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...