问题描述
我有一个发射数字的可观察流,我想要另一个发射所有 1 的流,这些 1 后面没有紧跟一个 2(比如在 200 毫秒内),例如来自这个源流:
(每个字符100ms)
1...12...112...11121...
结果应该是:
..1.......1.....11...1.
我将如何使用 rxjs@^6.6.7
做到这一点?
解决方法
基于@martin 的回答:
当发出 1 时,
- 我们创建了一个新的 observable,它等待下一个元素 200 毫秒
- 如果在 200 毫秒内没有元素进入,我们发出 1 并关闭订阅
- 如果一个元素出现并且它是 2,我们不发出任何东西并关闭订阅
- 如果一个元素出现并且它是 1,我们发出原始值并关闭订阅
https://stackblitz.com/edit/rxjs-ihyznt?devtoolsheight=66&file=index.ts
source
.pipe(
filter(value => value != 2),mergeMap(value =>
source.pipe(
first(),filter(v => v != 2),map(_ => value),timeout(200),catchError(_ => of(value))
)
)
)
.subscribe(value => console.log(value));
,
我会这样做。每个 1
都被一个内部 Observable 包裹并延迟了 200 毫秒,这可能会提前使用 takeUntil()
完成并因此被忽略:
source$
.pipe(
filter(value => value === 1),mergeMap(value => of(value).pipe(
delay(200),takeUntil(source$.pipe(
filter(value => value === 2),)),)