问题描述
我创建了一个 Writable
流,它连接到我们系统中的一个大型管道,在收到 BUFFER_SIZE
块(对象)后写入数据库。
getStream() {
const buffer = [];
const stream = new Writable({
objectMode: true,async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
}
});
stream.on('finish',async () => {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
}
});
return stream;
}
async consumer() {
await pipeline(...largePipeline,getStream());
closeAll();
}
这可以正常工作,但我遇到的问题是在 on('finish',...)
函数中调用 closeAll()
之后调用 consumer()
事件处理程序太晚了。
有没有办法让 write()
方法知道它刚刚收到最后一个 chunk
?这样我就可以在调用最后一个 next()
之前刷新缓冲区,一切都会同步。
请注意,在此代码库中,管道、使用者和编写器之间有非常严格的分离,我不能也不会在这些组件之间交换承诺、标志或状态检查。 Writable 流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决这个问题的方法,必须有一种方法来检查 Writable
流是否被最后调用并等待它真正完成,但我无法理解
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)