问题描述
我想在 observable 完成时执行代码。在我的代码中,我执行这个:
compact(): Observable<FileManifest> {
return this.loadindex().pipe(
mergeMap((index) => index.walk()),map((entry) => entry.manifest),notUndefined(),writeallMessages(this.newPath,ProtoFileManifest),finalize(async () => {
await Promise.all([
promises.rm(this.journalPath,{ force: true }),promises.rm(this.manifestPath,]);
await promises.rename(this.newPath,this.manifestPath);
}),);
}
问题是finalize方法是为同步代码做的。当我像上面那样执行异步代码时,代码将独立于订阅执行。
我希望在处理可观察对象的资源时执行此操作,但我希望在订阅时始终收到该事件。
谢谢 乌尔里希
解决方法
一种方法是创建三个 observables 而不是试图全部完成 在一。每个将在您想要的顺序异步链中组成一个链接 制作。
为了使基于 Promise 的 observable 中的副作用变得懒惰,我们使用 defer
。
请注意,延迟回调的返回值可以是可观察的,也可以是
“ObservableInput”,这就是 RxJS 调用它知道如何转换的值
变成可观察的。这个值可以是(除其他外)一个承诺。
({
compact(): Observable<FileManifest> {
const writeToTempManifest$ = this.loadIndex().pipe(
mergeMap((index) => index.walk()),map((entry) => entry.manifest),notUndefined(),writeAllMessages(this.newPath,ProtoFileManifest)
);
const removeOldManifest$ = defer(() =>
Promise.all([
promises.rm(this.journalPath,{ force: true }),promises.rm(this.manifestPath,])
);
const renameNewManifest$ = defer(() =>
promises.rename(this.newPath,this.manifestPath)
);
return from([
writeToTempManifest$,removeOldManifest$,renameNewManifest$,]).pipe(concatAll());
},});
请注意,这些 observable 中的每一个都可能会发出一些信息(尽管我不熟悉 API)。第一个发出 writeAllMessages
运算符所做的任何事情,而第二个和第三个发出各自承诺的解析值。在第二个的情况下,这是一个来自 Promise.all
的双元素数组。
如果你想抑制一个 observable 发出的值,同时在它完成之前仍然保持打开状态,你可以创建一个操作符来做到这一点:
const silence = pipe(concatMapTo(EMPTY));