Observable:每隔一段时间获取最新值,直到源完成

问题描述

我正在寻找具有类似于此签名的可观察选择器:

static IObservable<T> TakeLatest(this IObservable<T> input,TimeSpan interval)

哪个应该:

  1. 一旦输入发出第一项就发出第一项
  2. 从那时起,在之后的固定时间间隔内,发出输入产生的最新项目
  3. 每当输入完成(或失败)时完成(或失败)

就弹珠而言,类似于以下内容 - 假设间隔 = 2 个时间单位:

时间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
输入 A B C D E F(完整)
输出 A B D D E E 完成(F 不再发出

是否有任何开箱即用的方法,或者一个相当简单的选择器来产生这些结果?

解决方法

这应该完全符合您的要求。不过我还没有测试过。

/// <summary>Samples the source observable sequence at each interval,/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,TimeSpan interval,IScheduler scheduler = null)
{
    scheduler ??= DefaultScheduler.Instance;
    return source.Publish(published => Observable
        .Interval(interval,scheduler)
        .WithLatestFrom(published,(_,x) => x)
        .Merge(published.FirstAsync())
        .TakeUntil(published.LastOrDefaultAsync()));
}
,

我现在已经完成了以下操作 - 我认为它有效,但我会打开它,以防有人能想到更优雅的方法(或者能想到我当前的问题)实施)

static IObservable<T> TakeLatest(this IObservable<T> input,IScheduler scheduler) => input
  .FirstAsync()
  .Select(_ => Observable.Interval(interval,scheduler).StartWith(0))
  .Switch()
  .CombineLatest(input,(a,b) => (a,b))
  .DistinctUntilChanged(x => x.a)
  .Select(x => x.b)
  .TakeUntil(input.LastAsync());