问题描述
建议的实现,使用delay
运算符:
function emits(who){
return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}
var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");
var delayedSource$ = source.delay(1200);
var buffered$ = source
.buffer(function () { return delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})
buffered$.subscribe(emits("buffer"));
jsbin在这里:http://jsbin.com/wilurivehu/edit?html,js,console,output
解决方法
我正在使用Rx.Observable.fromEvent
NodeJS中的应用程序捕获事件。这些使用请求(https://www.npmjs.com/package/request)发送到另一台服务器。为了避免高网络负载,我需要在发送的请求之间的给定超时时间内缓冲那些事件。
问题
使用bufferWithTime(200)
将使节点进程保持运行,并且我不知道应用程序何时完成关闭流。
有什么方法可以使用Rx缓冲区说:
- 按下元素1时,设置一个计时器
- 当元素2和3在计时器到期之前到达时,将它们推入数组[1,2,3](缓冲区)
- 当计时器到期时,将[1、2、3]数组发送到管道下方。
- 如果元素4在计时器到期之后出现,则设置一个新计时器,然后重新开始。
如果没有推送任何元素,则不会启动任何计时器,这会使进程退出。
我最初的方法是:
Rx.Observable
.fromEvent(eventEmitter,'log')
.bufferWithTime(200) // this is the issue
.map(addEventsToRequestOption)
.map(request)
.flatMap(Promise.resolve)
.subscribe(log('Response received'))