javascript – 根据rxjs中的计时处理事件流

我有一个进程,每隔一段时间发送一次数据包,我需要根据数据包到达的时间等来管理该数据流.在某些时候,我也关闭流和过程.

现在,我正在使用一组计时器来做这件事,但我希望我可以用rxjs来做,因为它似乎非常适合这种事情.到目前为止,我没有取得多大成功.

问题

该流应该定期向我发送数据包,但它通常偏离很多,有时会卡住.

我希望在以下条件下关闭流:

>如果需要超过startDelay发送第一个数据包.
>发送第一个数据包后,如果两个数据包之间存在多于middleDelay的暂停.
>经过一段时间的maxChannelTime.

由于上述任何原因我即将关闭流时,我首先要求它礼貌地关闭以便它可以进行一些清理.有时它还会在清理过程中向我发送最终数据包.但是我想等待清理时间不要超过cleanupTime,并且在关闭流之前到达最后一个数据并忽略更多消息.

我将通过使用Observable包装事件来创建“流”.我这样做没有问题.

通过“关闭”流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡).

解决方法

棘手的问题.

我把它分解为两个阶段 – “受监管”(因为我们要定期检查)和“清理”.

向后工作,输出

const regulated = source.takeuntil(close)
const cleanup = source.skipUntil(close).takeuntil(cleanupCloser)
const output = regulated.merge(cleanup)

‘闭门器’是在关闭时发出的可观察量(每个超时值更近一个).

const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300

const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
  .takeuntil(source)                                // cancel after source emits
  .mapTo('startTimeoutMarker')

const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
    Observable.timer(intervalTimeout)           // emit once after intervalTimeout
      .mapTo('intervalTimeoutMarker')
  )

const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
  .takeuntil(startCloser)                               // cancel if startTimeout
  .takeuntil(intervalCloser)                            // cancel if intervalTimeout
  .mapTo('maxtimeTimeoutMarker')

const close = Observable.merge(startCloser,intervalCloser,maxtimeCloser).take(1)

const cleanupCloser = close.switchMap(x =>      // start when close emits
     Observable.timer(cleanupTimeout)           // emit once after cleanup time
  ) 
  .mapTo('cleanupTimeoutMarker')

这是一个工作样本CodePen(请一次运行一个测试)

相关文章

前言 做过web项目开发的人对layer弹层组件肯定不陌生,作为l...
前言 前端表单校验是过滤无效数据、假数据、有毒数据的第一步...
前言 图片上传是web项目常见的需求,我基于之前的博客的代码...
前言 导出Excel文件这个功能,通常都是在后端实现返回前端一...
前言 众所周知,js是单线程的,从上往下,从左往右依次执行,...
前言 项目开发中,我们可能会碰到这样的需求:select标签,禁...