问题描述
我编写了以下代码,期望最后一次订阅将打印['emitted value 1','emitted value 2'],但仅打印['emitted value 2']。这是有道理的,因为BehaviorSubject会发出最后一个值,但是为什么扫描累加器会重置为种子值?
我发现可以通过使用shareReplay(1)轻松解决此问题,但是我真的不明白为什么。在这两种情况下,使用shareReplay(1)与不使用扫描累加器时,发生了什么?
const subject = new BehaviorSubject(null);
const action$ = subject.asObservable();
const obs$ = action$.pipe(scan((acc,curr) => [...acc,curr],[]));
// const obs$ = action$.pipe(scan((acc,[]),shareReplay(1));
obs$.subscribe(res => console.log('first subs',res));
subject.next('emitted value 1');
subject.next('emitter value 2');
obs$.subscribe(res => console.log('second subs',res));
This is the console output without shareReplay(1)
解决方法
订阅BehaviorSubject
时,您将收到最后发出的值。这就是为什么您在控制台中仅收到second subs ["emitter value 2"]
的原因。
shareReplay
来自doc:
为什么要使用shareReplay?当您不希望产生副作用或增加计算负担时,通常希望使用shareReplay。 在多个订户之间执行。它也可能在 您知道您将有一个流的晚订户的情况 需要访问以前发出的值。重播此功能 订阅的价值是share和shareReplay的区别所在。
这意味着您将无需计算就可以接收所有发出的值,这是一种缓存。这在控制台中也是可见的,您只有一个打印包含第二个预订的所有值:
second subs [null,"emitted value 1","emitter value 2"]
当您订阅一个可观察的对象时,它会因为缺少更好的单词而“运行”。当值随时间散布时,这可能会造成混淆,因此我们将继续进行同步调用。认为以下是可观察的。
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(() => from([getRandom(),getRandom(),getRandom()]));
stream$.subscribe(console.log); // 4 6 2
stream$.subscribe(console.log); // 4 4 2
stream$.subscribe(console.log); // 1 8 6
stream$.subscribe(console.log); // 8 6 7
注意每个订阅如何获得不同的号码?它们都在重新运行getRandom()
,并且其订阅获得不同的值。
如果我们在stream$
中放置一个累加器,则每个订阅将以一个新鲜的种子开始,并根据它们的流发出的结果来获得结果。
如此:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(),getRandom()])
).pipe(
reduce((acc,val) => acc + val,0)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 10 (4+4+2)
stream$.subscribe(console.log); // 15 (1+8+6)
stream$.subscribe(console.log); // 21 (8+6+7)
查看每个订阅如何不添加到上一个订阅中?它只是在增加自己的数字。
他们没有在不同的订阅中共享累加器或值或任何东西。
那么当您使用shareReplay()
之类的东西时会发生什么?在这种情况下,与其订阅stream $并在4个不同的时间“运行”,它只订阅了一次,无论以后有多少人订阅,该共享订阅的结果都是共享的。
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(),0),shareReplay(1)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
在这里,我们的累加器只能运行一次。
如果您这样更改它:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(),getRandom()])
).pipe(
shareReplay(3),reduce((acc,0)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
然后,累加器运行4次,但是getRandom()
仅被调用来创建前3个值,之后每个订阅都获得相同的3个数字。
因此,回答您的问题:
当您没有shareReplay时,每个订阅者将从订阅时开始运行自己的scan
流。 与共享回复,scan
仅订阅一次。您将这些结果多播到所有将来的订户。就是这样。