为什么我的 forkJoin 订阅在订阅独立工作时没有达到?

问题描述

我有两个可观察对象,我想将它们合并到 forkJoinrxjs 方法中。独立执行 observable 可以工作,但使用 forkJoin 不会到达 pipe finalize/subscribe 方法

我的.component.ts

....
const req1 = this.userService.getUser(this.loggedInUser.userId);
const req2 = this.boardGames$;
this.subscriptions$ = forkJoin([req1,req2])
  .pipe(
    finalize(() => {
      console.log('pipe'); // Is not reached
    })
  )
  .subscribe(([obj1,obj2]) => {
    console.log('subscribe'); // Is not reached
  },err => console.log(err),()=>console.log('compl'));
req1.subscribe((aa) => console.log(aa)); // This is logged
req2.subscribe((bb) => console.log(bb)); // This is logged
....

我使用 Angularfire2 进行请求。我不确定这是否会成为问题,因为订阅是独立工作的。 import { AngularFirestore } from 'angularfire2/firestore';

在这里遗漏了什么?

解决方法

forkJoin 仅在所有 observable 完成时发出。我看不到你的其余代码(例如什么是 boardGames$ observable)。您很可能正在使用在首次发射后无法完成的 observable,这是 AngularFirestore 的预期行为,因为最常见的是您订阅了数据库 (Firebase) 中的更改。

如果您需要在某些 observable 发出时获取最新值,请使用 combineLatest。请记住,只有当每个源 observables 发出时,它才会开始发出。

combineLatest([
    this.userService.getUser(this.loggedInUser.userId),this.boardGames$
]).subscribe(([user,boardGames]) => {
     // Don't forget to unsubscribe
});

使用 merge 表示您想将 observable 合并为一个 observable。它适用于您当前的情况。像这样:

merge(
  this.userService.getUser(this.loggedInUser.userId).pipe(map(entity => ({entity,type: 'user'}))),this.boardGames$.pipe(map(entity => ({entity,type: 'boardGames'})))
).subscribe(({entity,type}) => {
    // Don't forget to unsubscribe
})

使用 forkJoin,您可以像这样实现:

const req1 = this.userService.getUser(this.loggedInUser.userId).pipe(take(1));
const req2 = this.boardGames$.pipe(take(1));
this.subscriptions$ = forkJoin([req1,req2]).subscribe(() => {
    // I will complete after both observables emits.
});

请注意,即使使用 take(1),您仍然需要处理订阅,因为如果某些 observable 永远不会发出并且组件被销毁,您就会发生内存泄漏。有 awesome library 用于处理没有样板的订阅。

,

forkjoin() 需要您的两个订阅都完成才能真正加入。因此,如果您的任一订阅未完成,则永远不会到达 forkjoin()。如果您使用的是 firebase,则您的 observable 不会完成。

如果您的订阅未完成并且您需要来自两个 observable 的流,那么您应该尝试使用 combineLatest()。这需要两个活动订阅,一旦每个订阅发出一个值,就会将这些值加入一个订阅中,并继续发出值直到完成。

Here is a link for combineLatest

如果您只需要在调用 firebase 之前检查用户是否有效,请尝试 switchMap()。这会将您的用户 observable 切换为您的棋盘游戏 observable,而您将只处理棋盘游戏 observable。