rxjs:错误后保持无限流活跃

问题描述

我想知道我是否可以在无限主流中合并映射的每个外部流的管道中使用catchError,而在主流的末尾使用一个catchError。此catchError返回对主流本身的引用以进行救援。可能会导致内存泄漏或任何其他问题?

这是示例代码

import { Observable,fromEvent } from "rxjs";
import { catchError,mergeMap } from "rxjs/operators";

function foreignStream() {
  return new Observable(observer => {
    observer.next(0);
    observer.next(1);
    observer.next(2);
    observer.error("error");
  });
}

const stream$ = fromEvent(document,"click").pipe(
  mergeMap(foreignStream),catchError(x => {
    console.log(x);
    return stream$;
  })
);
stream$.subscribe(
  console.log,x => console.log("err" + x),() => console.log("complete")
);

解决方法

虽然该解决方案看起来确实很有趣,但实际上会导致在旧的订阅保持打开状态时始终针对每个错误创建一个新的订阅。相反,您可以使用retryretryWhen重新启动可观察到的源,以防出现错误。

尝试以下

RxJS retry

const stream$ = fromEvent(document,"click").pipe(
  mergeMap(foreignStream),mergeMap(foreignStream2),mergeMap(foreignStream3),retry() // <-- retry immediately infinite times
);
const stream$ = fromEvent(document,retry(5) // <-- retry immediately 5 times on errors and complete
);

RxJS retryWhen

const stream$ = fromEvent(document,retryWhen(error => error.pipe(delay(5000))) // <-- retry after 5 seconds infinite times
);