rxjs:从2个终结点加载很多很多并合并为单个Observable

问题描述

我有一个端点用于获取用户列表,而另一个端点用于获取每个用户的公寓列表:

getUsers() => Observable<User[]>;
getUserApartments(userId) => Observable<Apartment[]>`

如何将两者的数据合并为一个可观察的数据:

const usersWithApartments$: Observable<{ user: User,apartments: Apartment[] }[]> = getUsers().pipe(
    // users.map(user => { user,apartments: getUserApartments(user.id) }) <-- turn this pseudocode into Rxjs
);

解决方法

由于getUsers()的返回类型为Users[],所以我假设所需的类型是{ user: User,apartments: Apartment[]>的集合,而不是单个集合。

请考虑以下实施方式:

class UserApartments {
  user: User;
  apartments: Apartment[];
}
const usersWithApartments$: Observable<UserApartments[]> = getUsers()
  .pipe(
    switchMap(users => forkJoin(
      users.map(user => getUserApartments(user.id).pipe(
        map(apartments => ({ user,apartments }))
      ))
    ))
  );
,

就这样:

const toUserWithApartmentsStream = (user: User) => getUserApartments(user.id).pipe(
  map((apartments: Apartment[]) => ({ user,apartments }))
);

const usersWithApartments$: Observable<{ user: User,apartments: Apartment[] }[]> =
  getUsers().pipe(
    switchMap((users: User[]) => users),concatMap((user: User) => toUserWithApartmentsStream(user)),toArray()
  );

现在getUserApartments将一个接一个地执行,我可以写mergeMap而不是concatMap,而getUserApartments将同时执行。感谢Rafi Henig的评论

函数toUserWithApartmentsStream只是一个助手,因此代码看起来更简洁。我可以将其内嵌在concatMap内。