如何在rxjs中每个订阅一次在可观察管道上执行初始化逻辑

问题描述

因此,我有一个可观察的管道,在订阅开始时需要在其中进行一次操作,就像您可以在订阅结束时使用finalize()进行一次操作一样

这就是我的开始,很遗憾,每次对主题进行next()调用时,它将启动一次。

    const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
    const notifications$ = this.notificationSubject.pipe(
      tap(() => startup()),filter(isValueDefined),finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));

然后我们得到了这个变体:

    let isstartedUp = false;
    const internalStartup = () => {
      if(!isstartedUp){
        isstartedUp = true;
        startup();
      }
    }

    const notifications$ = notificationSubject.pipe(
      tap(() => internalStartup()),finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));

...确实起作用,但是它做得有点好,因为现在启动仅进行一次(并且仅在第一个订阅上),而不是每次创建的订阅都进行一次。

我想象有类似的东西,但我还没有发现。

const notifications$ = notificationSubject.pipe(
      initialize(() => startup()),finalize(() => shutdown())
    );

解决方法

您可以使用defer对每个订阅执行一些代码。

export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    initializer();
    return source;
  });
}
const notifications$ = notificationSubject.pipe(
  initialize(() => startup()),finalize(() => shutdown())
);