问题描述
如何组合distinct、switchMap和mergeMap操作符,这样当源发出重复值时(由distinct.keySelector检测),取消之前的订阅(如switchMap中),但如果值不重复,则跟随mergeMap 的行为?
示例:
source = from(1,2,1,3) // 'abcde'
result = source.pipe(delay(),combination() // '--cde'
我目前正在做类似的事情:
const activeSubscriptions = new Map();
source$.pipe(
mergeMap((value) => {
const pendingSubscription = activeSubscriptions.get(value);
if (pendingSubscription) {
pendingSubscription.unsubscribe();
activeSubscriptions.delete(value);
}
const request$ = new Subject();
const subscription = this.service.get(value).subscribe({
complete: () => request$.complete(),error: (err) => request$.error(err),next: (value) => request$.next(value),});
activeSubscriptions.set(value,subscription);
return request$;
})
);
但正在寻找更好的方法来做到这一点。
提前致谢
解决方法
我认为您可以为此使用 windowToggle
运算符:
src$ = src$.pipe(shareReplay(1));
src$.pipe(
ignoreElements(),windowToggle(src$.pipe(observeOn(asyncScheduler)),openValue => src$.pipe(skip(1),filter(v => v === openValue))),mergeMap(
window => window.pipe(
startWith(null),withLatestFrom(src$.pipe(take(1))),map(([,windowVal]) => windowVal),)
),)
observeOn(asyncScheduler)
的替代品也可以是 delay(0)
,重要的是要确保 src$
的订阅者接收值的顺序是正确的。在这种情况下,我们希望确保当 src$
发出时,首先进行清理,这就是我们使用 src$.pipe(observeOn(asyncScheduler))
的原因。
ignoreElements()
是因为每个窗口仅与一个值配对,即创建该窗口的值。传递给 windowToggle
的第一个参数将描述可以创建 windows
的 observable。所以,我们只需要那些,因为我们能够在
window => window.pipe(
startWith(null),)
顺便说一下,window is nothing but a Subject
。
最后,如果您想在 window
的管道内执行异步操作,您必须确保在窗口完成(关闭)时取消订阅所有内容。要做到这一点,你可以试试这个:
window => window.pipe(
startWith(null),switchMap(val => /* some async action which uses `val` */),takeUntil(window.pipe(isEmpty()))
)
当源(在本例中为 isEmpty
)完成时,true
将发出 false
或 window
。 false
表示源在发出 complete
通知之前已发出至少一个值,否则为 true
。在这种情况下,我会说它是 true
还是 false
无关紧要,因为 window
本身不会发出任何值(因为我们使用了 ignoreElements
,忽略除 error
和 complete
通知之外的所有内容。