问题描述
我正在寻找具有类似于此签名的可观察选择器:
static IObservable<T> TakeLatest(this IObservable<T> input,TimeSpan interval)
哪个应该:
- 一旦输入发出第一项就发出第一项
- 从那时起,在之后的固定时间间隔内,发出输入产生的最新项目
- 每当输入完成(或失败)时完成(或失败)
就弹珠而言,类似于以下内容 - 假设间隔 = 2 个时间单位:
时间 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
输入 | A | B | C | D | E | F(完整) | |||||||||
输出 | A | B | D | D | E | E | 完成(F 不再发出) |
是否有任何开箱即用的方法,或者一个相当简单的选择器来产生这些结果?
解决方法
这应该完全符合您的要求。不过我还没有测试过。
/// <summary>Samples the source observable sequence at each interval,/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,TimeSpan interval,IScheduler scheduler = null)
{
scheduler ??= DefaultScheduler.Instance;
return source.Publish(published => Observable
.Interval(interval,scheduler)
.WithLatestFrom(published,(_,x) => x)
.Merge(published.FirstAsync())
.TakeUntil(published.LastOrDefaultAsync()));
}
,
我现在已经完成了以下操作 - 我认为它有效,但我会打开它,以防有人能想到更优雅的方法(或者能想到我当前的问题)实施)
static IObservable<T> TakeLatest(this IObservable<T> input,IScheduler scheduler) => input
.FirstAsync()
.Select(_ => Observable.Interval(interval,scheduler).StartWith(0))
.Switch()
.CombineLatest(input,(a,b) => (a,b))
.DistinctUntilChanged(x => x.a)
.Select(x => x.b)
.TakeUntil(input.LastAsync());