问题描述
因此,我有一个可观察的管道,在订阅开始时需要在其中进行一次操作,就像您可以在订阅结束时使用finalize()
进行一次操作一样
这就是我的开始,很遗憾,每次对主题进行next()
调用时,它将启动一次。
const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
const notifications$ = this.notificationSubject.pipe(
tap(() => startup()),filter(isValueDefined),finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
然后我们得到了这个变体:
let isstartedUp = false;
const internalStartup = () => {
if(!isstartedUp){
isstartedUp = true;
startup();
}
}
const notifications$ = notificationSubject.pipe(
tap(() => internalStartup()),finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
...确实起作用,但是它做得有点好,因为现在启动仅进行一次(并且仅在第一个订阅上),而不是每次创建的订阅都进行一次。
我想象有类似的东西,但我还没有发现。
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),finalize(() => shutdown())
);
解决方法
您可以使用defer
对每个订阅执行一些代码。
export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => defer(() => {
initializer();
return source;
});
}
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),finalize(() => shutdown())
);