如何使用MergeMap或FlatMap或rxJs-operators更好的方法编写以下代码? 更新:可观察到 this.selectedUnitDetailModel$的连续流

问题描述

我有两个可观察的管道。我需要一个一个地运行,并比较两个相等或不相等的值。我尝试了下面的代码。这应该可以工作,当发出第一个可观察值时,它应该去并获取第二个obserbla值,并应该比较它的第一个返回值。我需要一些专家的帮助,以便更好地引用此代码

   this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
          (res: UnitDetail) =>{
              if(res.unitTwo){
                this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub)).subscribe(
                  (unitId: string) => {
                    if(unitId ===  res.unitTwo){
                      this.sameUnit = true;
                    }else{
                      this.sameUnit = false;
                    }
                  });
              }
          }
       );

解决方法

this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
          (res: UnitDetail) =>{
              if(res.unitTwo){
               return this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub),map(
                  (unitId: string) =>  unitId ===  res.unitTwo);
              }
          }
       ).subscribe({
        next: (sameUnit: boolean) => {
           //do something 
        }
       });
,

您不需要高阶运算符,因为可观察值this.selectedUnitDetailModel$this.appStore.select(selectUnit)是彼此独立的。相反,您可以使用forkJoincombineLatestzip之类的函数来并行地从它们那里获取通知。

在这些功能here的作用下,您会发现差异。

尝试以下

forkJoin(
  this.selectedUnitDetailModel$.pipe(take(1)),// <-- complete on first emission
  this.appStore.select(selectUnit).pipe(take(1))    // <-- complete on first emission
).subscribe(
  ([res,unitId]) => this.sameUnit = res.unitTwo === unitId,(error) => console.log(error)                     // <-- handle error
);

forkJoin仅在源可观测值完成时发出,因此我将take(1)用管道传输到每个可观测值。 forkJoin现在将在每个可观察和完整的首次发射时发射。因此,您的shareReplayUntil(this.destroySub)的需求得以减轻。

但是,如果需要使可观察对象的发射流保持打开状态,则可以使用combineLatestzip代替。在这种情况下,您可以将take(1)替换为“ shareReplayUntil(this.destroySub)”。

更新:可观察到 this.selectedUnitDetailModel$的连续流

就像我之前说过的那样,您可以使用combineLatest代替forkJoin来启用连续的数据流。

尝试以下

import { Subject,combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

combineLatest(
  this.selectedUnitDetailModel$,this.appStore.select(selectUnit)
).pipe(
  takeUntil(this.destroySub)         // <-- replaced with `takeUntil` operator
).subscribe(
  ([res,(error) => console.log(error)                     // <-- handle error
);