问题描述
这是一些代码
const fs = require('fs');
const stream = require('stream');
const db = require('some-db-engine');
const readStream = fs.createReadStream('file.dat');
const transformStream = new stream.Transform({
objectMode: true,highWaterMark: 1000,transform(chunk,encoding,callback) {
const obj = { /* somehow transform input chunk to object */ };
this.push(obj);
callback();
}
});
const writableStream = new stream.Writable({
objectMode: true,writev(chunks,callback) {
/* chunks should be a buffered array of objects no more than 1000 items */
db.query('dump all chunks () in one move',callback);
}
});
stream.pipeline(
readStream,transformStream,writableStream,(error) => {
if (error) {
console.error(error);
} else {
console.log('Success!');
}
}
);
我想要的是writableStream
临时将传入的对象按1000个项目分批缓冲,因此数据一次被大部分写入db。相反,writev
方法始终接收仅一个元素的数组。
当然,我可以实现自己的内部缓冲区,只是想知道我是否丢失了某些东西,以及是否可以使用本机api:highWaterMark
或writable.cork()
或其他API来实现?
更新
到目前为止,我一直在等待解决方案:
const createChunkedStream = (size) => {
const queue = [];
const process = (stream,end,chunk,callback) => {
end || queue.push(chunk);
if (queue.length >= (end ? 1 : size)) {
stream.push(queue.splice(0));
}
callback();
};
return stream.Transform({
transform(chunk,callback) {
process(this,false,callback);
},flush(callback) {
process(this,true,null,callback);
}
});
};
可以用作
stream.pipeline(
readStream,createChunkedStream(1000),(error) => { /* ... */ }
);
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)