RxJS - 一个 observable 'A' 不会停止从同一个 observable 'A' 传输的 takeUntil 发射

问题描述

我有一个 observable,它在单击开始按钮后每秒向控制台打印一个数字。 observable 应该在点击开始按钮 5 秒后停止打印。

开始条件 点击开始按钮后的每一秒

结束条件 点击开始按钮五秒后

不起作用的代码

const startClick$ = fromEvent(document.getElementById('start'),'click');
const stop$ = startClick$.pipe(switchMapTo(interval(5000)));
const printInterval$ = interval(1000).pipe(takeuntil(stop$));
const startPrint$ = startClick$.pipe(switchMapTo(printInterval$));

startPrint$.subscribe(
  value => {
    console.log('new value is:',value);
  },() => {
    console.log('there was some error!');
  },() => {
    console.log('i have completed');
  }
);

stop$.subscribe(() => {
  console.log('i should stop right Now!!');
});

点击开始按钮 5 秒后,startPrint$ Observable 不会停止发射。

有效的代码

const endClick$ = fromEvent(document.getElementById('end'),'click');
const stopWithEndClick$ = endClick$.pipe(switchMapTo(interval(5000)));
const printInterval1$ = interval(1000).pipe(takeuntil(stopWithEndClick$));
const startPrint1$ = startClick$.pipe(switchMapTo(printInterval1$));

startPrint1$.subscribe(
  value => {
    console.log('1: new value is:',() => {
    console.log('1: there was some error!');
  },() => {
    console.log('1: i have completed');
  }
);

stopWithEndClick$.subscribe(() => {
  console.log('1: i should stop right Now!!');
});

这里我有一个按钮 end,我在结束按钮被点击 5 秒后才使用。在这种情况下,observable 将停止按预期发射。

我怎样才能使第一个案例起作用,我在这里犯了什么错误吗?可以在 https://stackblitz.com/edit/take-until-issue?file=index.ts

找到完整的工作代码

解决方法

发生这种情况是因为 RxJS 操作符内部的订阅顺序。当您第一次点击“开始”按钮时,第一个收到通知的人是 stop$,但没有人在监听 stop$,因此通知不会触发任何事情。您看到 "'i should stop right now!!' 只是因为您最后订阅了自己。

因此,最简单的解决方案是将 stop$ 链创建为 Observable,只有在您订阅后才会开始发射,例如 timer(5000) 只发出一次然后完成。

const stop$ = timer(5000);

您更新的演示:https://stackblitz.com/edit/take-until-issue-or4okm?file=index.ts

请注意,您可能会得到不同数量的结果,因为 RxJS(一般为 JavaScript)无法保证 5000ms 将完全是 5000ms。这同样适用于 1000ms,因此有时可能会以不同的顺序触发事件。如果您只想要 5 个结果,最好使用例如 take(5)。此外,对于日志记录,最好使用例如 tap(v => console.log(v)) 这样您就不会再订阅。