为什么在创建新订阅时会重置扫描累加器值,而在使用shareReplay1时却不重置?

问题描述

我编写了以下代码,期望最后一次订阅将打印['emitted value 1','emitted value 2'],但仅打印['emitted value 2']。这是有道理的,因为BehaviorSubject会发出最后一个值,但是为什么扫描累加器会重置为种子值?

我发现可以通过使用shareReplay(1)轻松解决此问题,但是我真的不明白为什么。在这两种情况下,使用shareReplay(1)与不使用扫描累加器时,发生了什么?

const subject = new BehaviorSubject(null);
const action$ = subject.asObservable();

const obs$ = action$.pipe(scan((acc,curr) => [...acc,curr],[]));
// const obs$ = action$.pipe(scan((acc,[]),shareReplay(1));

obs$.subscribe(res => console.log('first subs',res));

subject.next('emitted value 1');

subject.next('emitter value 2');

obs$.subscribe(res => console.log('second subs',res));

This is the console output without shareReplay(1)

And with shareReplay(1)

解决方法

订阅BehaviorSubject时,您将收到最后发出的值。这就是为什么您在控制台中仅收到second subs ["emitter value 2"]的原因。

shareReplay

来自doc

为什么要使用shareReplay?当您不希望产生副作用或增加计算负担时,通常希望使用shareReplay。 在多个订户之间执行。它也可能在 您知道您将有一个流的晚订户的情况 需要访问以前发出的值。重播此功能 订阅的价值是share和shareReplay的区别所在。

这意味着您将无需计算就可以接收所有发出的值,这是一种缓存。这在控制台中也是可见的,您只有一个打印包含第二个预订的所有值: second subs [null,"emitted value 1","emitter value 2"]

,

当您订阅一个可观察的对象时,它会因为缺少更好的单词而“运行”。当值随时间散布时,这可能会造成混淆,因此我们将继续进行同步调用。认为以下是可观察的。

const getRandom = () => Math.floor(Math.random() * 10);

const stream$ = defer(() => from([getRandom(),getRandom(),getRandom()]));

stream$.subscribe(console.log); // 4 6 2
stream$.subscribe(console.log); // 4 4 2
stream$.subscribe(console.log); // 1 8 6
stream$.subscribe(console.log); // 8 6 7

注意每个订阅如何获得不同的号码?它们都在重新运行getRandom(),并且其订阅获得不同的值。

如果我们在stream$中放置一个累加器,则每个订阅将以一个新鲜的种子开始,并根据它们的流发出的结果来获得结果。

如此:

const getRandom = () => Math.floor(Math.random() * 10);

const stream$ = defer(
  () => from([getRandom(),getRandom()])
).pipe(
  reduce((acc,val) => acc + val,0)
);

stream$.subscribe(console.log); // 12 (4+6+2) 
stream$.subscribe(console.log); // 10 (4+4+2)
stream$.subscribe(console.log); // 15 (1+8+6)
stream$.subscribe(console.log); // 21 (8+6+7)

查看每个订阅如何不添加到上一个订阅中?它只是在增加自己的数字。

他们没有在不同的订阅中共享累加器或值或任何东西。

那么当您使用shareReplay()之类的东西时会发生什么?在这种情况下,与其订阅stream $并在4个不同的时间“运行”,它只订阅了一次,无论以后有多少人订阅,该共享订阅的结果都是共享的。

const getRandom = () => Math.floor(Math.random() * 10);

const stream$ = defer(
  () => from([getRandom(),0),shareReplay(1)
);

stream$.subscribe(console.log); // 12 (4+6+2) 
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12

在这里,我们的累加器只能运行一次。

如果您这样更改它:

const getRandom = () => Math.floor(Math.random() * 10);

const stream$ = defer(
  () => from([getRandom(),getRandom()])
).pipe(
  shareReplay(3),reduce((acc,0)
);

stream$.subscribe(console.log); // 12 (4+6+2) 
stream$.subscribe(console.log); // 12 (4+6+2) 
stream$.subscribe(console.log); // 12 (4+6+2) 
stream$.subscribe(console.log); // 12 (4+6+2) 

然后,累加器运行4次,但是getRandom()仅被调用来创建前3个值,之后每个订阅都获得相同的3个数字。


因此,回答您的问题:

当您没有shareReplay时,每个订阅者将从订阅时开始运行自己的scan流。 共享回复,scan仅订阅一次。您将这些结果多播到所有将来的订户。就是这样。