Node 中的简单缓冲双工流 - 如何?

问题描述

我正在尝试实现具有缓冲功能的双工流。

它应该积累数据块,直到有足够的数据,然后再进一步发送。

例如,在播放流式音频/视频数据时可以使用它:不会简单地及时获取帧,对吗?

下面是我创建这样一个缓冲双工流的愚蠢尝试。有一个源流,它将 x\n 个字符发送到缓冲流,而缓冲流又应该将数据进一步发送到 process.stdout

唉,它不起作用。具体来说,read() 函数似乎没有任何暂停停止方法,例如:

“嘿,我现在没有你的数据,稍后回来”

不,一旦我返回 undefinednull,故事就结束了,标准输出没有任何内容

var {Readable,Duplex} = require('stream');

// Source stream,seeds: x\n,x\n,...
let c = 10;
var rs = new Readable({
  read () {
    if (c > 0) {
      c--;
      console.log('rs reading:','x');
      this.push('x\n');
    } 
    else {
      this.push(null)
    }
  },});

// Buffering duplex stream
// I want it to cache 3 items and only then to proceed
const queue = [];
const limit = 3;
var ds = new Duplex({
  writableHighWaterMark: 0,write (chunk,encoding,callback) {
    console.log('ds writing:',chunk,'paused: ',ds.isPaused());
    queue.push(chunk);
    callback();
  },readableHighWaterMark: 0,read () {
    // We don't want to output anything
    // until there's enough elements in the `queue`.
    if (queue.length >= limit) {
      const chunk = queue.shift();
      console.log('ds reading:',chunk);
      this.push(chunk);
    }
    else {
      // So how to wait here?
      this.push(undefined)
    }
  },});

// PROBLEM: nothing is coming out of the "ds" and printed on the stdout
rs.pipe(ds).pipe(process.stdout);

这是我的回复https://repl.it/@OnkelTem/BufferingStream1-1#index.js

我检查了双工的状态,它甚至没有处于暂停状态。所以它没有暂停,它在流动,但 - 什么都不返回。

我还花了几个小时重新阅读有关 Node 流的文档,但实际上并不觉得它是为理解而创建的。

解决方法

缓冲流只是一种转换流。如果我理解您要正确执行的操作,那么实现应该不会比这更复杂:

const { Transform } = require('stream');

const DEFAULT_CAPACITY = 10;

class BufferingTransform extends Transform {
  constructor(options = {}) {
    super(options);

    this.capacity = options.capacity || DEFAULT_CAPACITY ;
    this.pending = [] ;

    return;
  }

  get atCapacity() {
    return this.pending.length >= this.capacity;
  }

  _transform(chunk,encoding,cb) {

    if ( this.atCapacity ) {
      this.push( ...this.pending.shift() );
    }

    this.pending.push( [chunk,encoding] );

    cb();
  }

  _flush(cb) {

    while (this.pending.length > 0) {
      this.push( ...this.pending.shift() );
    }

    cb();
  }

}

一旦你有了它,它应该只是通过 BufferingStream and reading from the BufferingStream` 管道你的源的问题:

async function readFromSource() {
  const source = openSourceForReading();
  const buffer = new BufferingStream();

  source.pipe(buffer);

  for await (const chunk of buffer) {
    console.log(chunk);
  }

}
,

这是一个使用异步迭代的实现:

function bufferStream(stream,bufferCount){
    stream = normalizeAsyncIterable(stream);
    const iterator = stream[Symbol.asyncIterator]();
    const queue = []

    while(queue.length < bufferCount)
        queue.push(iterator.next());

    return normalizeAsyncIterable({
        [Symbol.asyncIterator]: () => ({
            next: async () => {
                const promise = queue.shift() ?? iterator.next();
                while(queue.length < bufferCount)
                    queue.push(iterator.next());
                return promise;
            }
        })
    });
}

// Ensures that calls to .next() while the generator is paused are handled correctly
async function* normalizeAsyncIterable(iterable){
    for await(const value of iterable)
        yield value;
}

TS Playground Link