如何创建一个RxJS缓冲区来对NodeJS中的元素进行分组,但不依赖于永远的运行间隔?

问题描述

建议的实现,使用delay运算符:

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}

var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");

var delayedSource$ = source.delay(1200);

var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})

buffered$.subscribe(emits("buffer"));

jsbin在这里http://jsbin.com/wilurivehu/edit?html,js,console,output

解决方法

我正在使用Rx.Observable.fromEventNodeJS中的应用程序捕获事件。这些使用请求(https://www.npmjs.com/package/request)发送到另一台服务器。为了避免高网络负载,我需要在发送的请求之间的给定超时时间内缓冲那些事件。

问题

使用bufferWithTime(200)将使节点进程保持运行,并且我不知道应用程序何时完成关闭流。

有什么方法可以使用Rx缓冲区说:

  1. 按下元素1时,设置一个计时器
  2. 当元素2和3在计时器到期之前到达时,将它们推入数组[1,2,3](缓冲区)
  3. 当计时器到期时,将[1、2、3]数组发送到管道下方。
  4. 如果元素4在计时器到期之后出现,则设置一个新计时器,然后重新开始。

如果没有推送任何元素,则不会启动任何计时器,这会使进程退出。

我最初的方法是:

Rx.Observable
     .fromEvent(eventEmitter,'log')
     .bufferWithTime(200) // this is the issue
     .map(addEventsToRequestOption)
     .map(request)
     .flatMap(Promise.resolve)
     .subscribe(log('Response received'))