问题描述
我有一个 Observable source
,它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个 Observable,该 Observable 每 500 毫秒可靠地发出一次其值。
假设 source
在这些时间发出值:
- 100 毫秒 - 第一项
- 980 毫秒 - 第二项
- 1020 毫秒 - 第三项
- 1300 毫秒 - 第四项,以此类推
我想“平滑”这个流,以便得到如下输出:
- 500 毫秒 - 第一项
- 1000 毫秒 - 第二项
- 1500 毫秒 - 第三项
- 2000 毫秒 - 第四项
一种简单的方法可能是在源项目的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。
我尝试了 .timer()
、.interval()
和 .flatMap()
的各种组合,但都没有任何希望。
解决方法
我想你可以试试这个:
const src$ = merge(
timer(100).pipe(mapTo(1)),timer(980).pipe(mapTo(2)),timer(1020).pipe(mapTo(3)),timer(1300).pipe(mapTo(4))
);
src$
.pipe(
bufferTime(500),mergeAll()
)
.subscribe(console.log);
bufferTime
用于创建一个以恒定间隔发射的计时器,而不管发射的值如何。然后mergeAll
用于分解由bufferTime
产生的数组。
您可以使用 combineLatest
、interval
和 throttle
的组合 - 您添加第二个可观察对象 interval
以及您想要的调用之间的时间(例如 500 毫秒),所以每 500 毫秒您的 observable 将发出一次(当与 combineLatest
一起使用时),现在它将每 500 毫秒 和 每次原始 source
发出时发出一次值,因此您可以添加 { {1}} 在管道中,这将导致间隔节流:
throttle
(这里不需要combineLatest([source,timer(5000)])
.pipe(
throttle(() => interval(5000)),tap(([value]) => {
console.log("emitted",value,new Date().getSeconds());
})
)
.subscribe();
,添加只是为了演示)
对于比您的时间间隔快的源
zip
您的来源,具有 interval
所需的时间跨度。
zip(source,interval(500)).pipe(
map(([value,_]) => value) // only emit the source value
)
zip
发出来自 source
的第一个项目和来自 interval
的第一个项目,然后是来自 source
的第二个项目和来自 interval
的第二个项目和很快。如果输出 observable 应该只在 interval
发出时发出,那么 source
的第 N 个值必须在 interval
的第 N 个值之前到达。
潜在问题:
如果您的 source
在某个时刻发出的信号比 interval
慢(即 source
的第 N 个值在 interval
的第 N 个值之后到达),则 { {1}} 将直接发出,无需等待下一次 zip
发出。
interval
对于以任何速率发射的源
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
zip output: -----1-----2-----3-----4--------5----6-----
✓ ✓ ✓ ✓ ⚠️ ⚠️
// emits 5 and 6 don't happen when interval emits
function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
defer(() => {
let sourceCompleted = false;
const queue = source.pipe(
tap({ complete: () => (sourceCompleted = true) }),scan((acc,curr) => (acc.push(curr),acc),[]) // collect all values in a buffer
);
return interval(period).pipe(
withLatestFrom(queue),// combine with the latest buffer
takeWhile(([_,buffer]) => !sourceCompleted || buffer.length > 0),// complete when the source completed and the buffer is empty
filter(([_,buffer]) => buffer.length > 0),// only emit if there is at least on value in the buffer
map(([_,buffer]) => buffer.shift()) // take the first value from the buffer
);
});
}
source.pipe(
emitOnInterval(500)
)