检测流的 Writable 最后一个块

问题描述

我创建了一个 Writable 流,它连接到我们系统中的一个大型管道,在收到 BUFFER_SIZE 块(对象)后写入数据库。

getStream() {
    const buffer = [];

    const stream = new Writable({
        objectMode: true,async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        }
    });

    stream.on('finish',async () => {
        // insert last batch?
        if( buffer.length ) {
            await insertToDB(buffer);
        }
    });

    return stream;
}

async consumer() {
    await pipeline(...largePipeline,getStream());
    closeAll();
}

这可以正常工作,但我遇到的问题是在 on('finish',...) 函数中调用 closeAll() 之后调用 consumer() 事件处理程序太晚了。

有没有办法让 write() 方法知道它刚刚收到最后一个 chunk?这样我就可以在调用最后一个 next() 之前刷新缓冲区,一切都会同步。

请注意,在此代码库中,管道、使用者和编写器之间有非常严格的分离,我不能也不会在这些组件之间交换承诺、标志或状态检查。 Writable 流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决这个问题的方法,必须有一种方法来检查 Writable 流是否被最后调用并等待它真正完成,但我无法理解

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)