问题描述
我有两个可观察的管道。我需要一个接一个地运行,并比较两个相等或不相等的值。我尝试了下面的代码。这应该可以工作,当发出第一个可观察值时,它应该去并获取第二个obserbla值,并应该比较它的第一个返回值。我需要一些专家的帮助,以便更好地引用此代码。
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
(res: UnitDetail) =>{
if(res.unitTwo){
this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub)).subscribe(
(unitId: string) => {
if(unitId === res.unitTwo){
this.sameUnit = true;
}else{
this.sameUnit = false;
}
});
}
}
);
解决方法
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
(res: UnitDetail) =>{
if(res.unitTwo){
return this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub),map(
(unitId: string) => unitId === res.unitTwo);
}
}
).subscribe({
next: (sameUnit: boolean) => {
//do something
}
});
,
您不需要高阶运算符,因为可观察值this.selectedUnitDetailModel$
和this.appStore.select(selectUnit)
是彼此独立的。相反,您可以使用forkJoin
,combineLatest
或zip
之类的函数来并行地从它们那里获取通知。
在这些功能here的作用下,您会发现差异。
尝试以下
forkJoin(
this.selectedUnitDetailModel$.pipe(take(1)),// <-- complete on first emission
this.appStore.select(selectUnit).pipe(take(1)) // <-- complete on first emission
).subscribe(
([res,unitId]) => this.sameUnit = res.unitTwo === unitId,(error) => console.log(error) // <-- handle error
);
forkJoin
仅在源可观测值完成时发出,因此我将take(1)
用管道传输到每个可观测值。 forkJoin
现在将在每个可观察和完整的首次发射时发射。因此,您的shareReplayUntil(this.destroySub)
的需求得以减轻。
但是,如果需要使可观察对象的发射流保持打开状态,则可以使用combineLatest
或zip
代替。在这种情况下,您可以将take(1)
替换为“ shareReplayUntil(this.destroySub)”。
更新:可观察到 this.selectedUnitDetailModel$
的连续流
就像我之前说过的那样,您可以使用combineLatest
代替forkJoin
来启用连续的数据流。
尝试以下
import { Subject,combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
combineLatest(
this.selectedUnitDetailModel$,this.appStore.select(selectUnit)
).pipe(
takeUntil(this.destroySub) // <-- replaced with `takeUntil` operator
).subscribe(
([res,(error) => console.log(error) // <-- handle error
);