如何在发出订阅之前使流中的Observable收集值x秒

问题描述

背景:我需要在路由器NavigationStart事件上运行一些代码,但前提是该事件后没有NavigationCancel

我正在尝试以下操作:

const sourceTimer = timer(1000)
this.routerListeneRSSubscriber = this.router.events
  .pipe(
    map(e => e),takeuntil(sourceTimer)
  )
  .subscribe(event => {...})

想要: 显然,这一切都是在监听路由器事件50毫秒。我希望它在订阅获得它们之前从this.router.events收集所有值50ms,以便我检查是否发生NavigationCancel事件。

编辑:我尝试了很多事情,包括buffer运算符的变体,但是这样做的时候,我遇到的问题是流只会“运行”一次然后在第一次的“计时器”用完后不会再用于后续事件。

解决方法

这是您问题的答案:

this.router.events
.pipe(
  takeUntil(timer(50)),toArray()
)
.subscribe(events => {
  if(events.any(event => event instanceof NavigationCancel) return 
  else // do your code
})

但是我认为以下是您真正要寻找的选项:


选项1。
只需使用NavigationCanceltakeWhile上完成,即可阻止next中的代码运行。

this.router.events
.pipe(
  takeWhile(x => !(x instanceof NavigationCancel)),takeUntil(timer(50)),last()
)
.subscribe(x => {
 // run some code ...
})


选项2。

this.router.events
  .pipe(
    map(x => x instanceof NavigationCancel ? throwError("NavigationCanceled") : x),)
  .subscribe({
    complete: () => {
      // run your code here
    },error: console.error
  })


选项3。

this.router.events
.pipe(
  every(x => !(x instanceof NavigationCancel)),takeUntil(timer(50))
)
  .subscribe(x => {
    if (x == true)
    {
      // run your code
    }
  });
,

您可以使用reduce收集发出的每个项目,然后以takeUntil结束订阅。确保takeUntil在reduce之前。

const { timer } = rxjs;
const { reduce,takeUntil } = rxjs.operators;

timer(500,500).pipe(
  takeUntil(timer(3000)),reduce((results,item) => [...results,item],[])
).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js" integrity="sha512-kN8bAZFoIra6Z7dDPNMD3efoGLn7QpOZgBcNpLwKIoBM5rXLVxt9nPHNo+4WrIsT0RBc/h2sXtN08n1ALxn4yw==" crossorigin="anonymous"></script>