从不可预测的源 Observable 构建“心跳”Observable 对于比您的时间间隔快的源对于以任何速率发射的源

问题描述

我有一个 Observable source,它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个 Observable,该 Observable 每 500 毫秒可靠地发出一次其值。

假设 source 在这些时间发出值:

  • 100 毫秒 - 第一项
  • 980 毫秒 - 第二项
  • 1020 毫秒 - 第三项
  • 1300 毫秒 - 第四项,以此类推

我想“平滑”这个流,以便得到如下输出:

  • 500 毫秒 - 第一项
  • 1000 毫秒 - 第二项
  • 1500 毫秒 - 第三项
  • 2000 毫秒 - 第四项

一种简单的方法可能是在源项目的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。

我尝试了 .timer().interval().flatMap() 的各种组合,但都没有任何希望。

解决方法

我想你可以试试这个:

const src$ = merge(
  timer(100).pipe(mapTo(1)),timer(980).pipe(mapTo(2)),timer(1020).pipe(mapTo(3)),timer(1300).pipe(mapTo(4))
);

src$
  .pipe(
    bufferTime(500),mergeAll()
  )
  .subscribe(console.log);

bufferTime 用于创建一个以恒定间隔发射的计时器,而不管发射的值如何。然后mergeAll用于分解bufferTime产生的数组。

StackBlitz demo

,

您可以使用 combineLatestintervalthrottle 的组合 - 您添加第二个可观察对象 interval 以及您想要的调用之间的时间(例如 500 毫秒),所以每 500 毫秒您的 observable 将发出一次(当与 combineLatest 一起使用时),现在它将每 500 毫秒 每次原始 source 发出时发出一次值,因此您可以添加 { {1}} 在管道中,这将导致间隔节流:

throttle

(这里不需要combineLatest([source,timer(5000)]) .pipe( throttle(() => interval(5000)),tap(([value]) => { console.log("emitted",value,new Date().getSeconds()); }) ) .subscribe(); ,添加只是为了演示)

,

对于比您的时间间隔快的源

zip 您的来源,具有 interval 所需的时间跨度。

zip(source,interval(500)).pipe(
  map(([value,_]) => value)  // only emit the source value
)

enter image description here

zip 发出来自 source 的第一个项目和来自 interval 的第一个项目,然后是来自 source 的第二个项目和来自 interval 的第二个项目和很快。如果输出 observable 应该只在 interval 发出时发出,那么 source 的第 N 个值必须 interval 的第 N 个值之前到达。

潜在问题: 如果您的 source 在某个时刻发出的信号比 interval 慢(即 source 的第 N 个值在 interval 的第 N 个值之后到达),则 { {1}} 将直接发出,无需等待下一次 zip 发出。

interval

对于以任何速率发射的源

// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
zip output:   -----1-----2-----3-----4--------5----6-----
                   ✓     ✓     ✓     ✓        ⚠️    ⚠️
// emits 5 and 6 don't happen when interval emits
function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    defer(() => {
      let sourceCompleted = false;
      const queue = source.pipe(
        tap({ complete: () => (sourceCompleted = true) }),scan((acc,curr) => (acc.push(curr),acc),[]) // collect all values in a buffer
      );
      return interval(period).pipe(
        withLatestFrom(queue),// combine with the latest buffer
        takeWhile(([_,buffer]) => !sourceCompleted || buffer.length > 0),// complete when the source completed and the buffer is empty
        filter(([_,buffer]) => buffer.length > 0),// only emit if there is at least on value in the buffer
        map(([_,buffer]) => buffer.shift()) // take the first value from the buffer
      );
    });
}

source.pipe(
  emitOnInterval(500)
)

https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...