Reactive Extensions - 切换,但等到前一个 observable 完成而不是取消

问题描述

我有一个由可观察订阅触发的异步 Load 方法

我想使用以下策略防止并发调用 LoadAsync:

enter image description here

我几乎可以肯定这个策略有一个名字,但我想不起来了。

  1. 如果之前的调用尚未完成,则不会调用 LoadAsync。

    • LoadAsync(3) 仅在 LoadAsync(1) 完成后才开始
  2. LoadAsync 只为最近的值调用

    • LoadAsync(2) 从未被调用

我目前使用此 switchmap 等效项,但它取消了 LoadAsync 方法支持调用
Observable<int> sourceObservable..
sourceObservable
   .Select(value => Observable.FromAsync(() => LoadAsync(value)))
   .Switch() 

async Task<int> LoadAsync(int value)
{
    await Task.Delay(1000);
    return value;
}

在javascript中它会是:

source$.pipe(switchMap(Load))

function load(id): Observable<int> {
    return delay(1000).pipe(map(() => id));
}

编辑:我想知道为什么没有人要求这种扁平化策略。我认为这是很常见的要求。

例如,这就是 CI/CD 管道通常的工作方式。如果有太多推送到某个分支而您没有能力构建所有这些,但您想确保构建最新的提交

解决方法

换句话说,你想从 2nd-level observable 中取出一个 child observable,获取所有结果并等待它完成,然后重复。

这将在 C# 中工作:

public static IObservable<T> SingleSwitch<T,U>(this IObservable<U> source,Func<U,IObservable<T>> selector)
{
    return source.Publish(_source => _source
        .Take(1)
        .SelectMany(e => selector(e))
        .Repeat()
    );
}

请注意,您的 Load 中存在错误。您需要将 await 添加到延迟正常工作中。

示例代码:

Observable.Interval(TimeSpan.FromSeconds(.75))
    .SingleSwitch(e => Observable.FromAsync (async () => {
        await Task.Delay(1000);
        return e;
    }))
    .Dump();

输出是偶数,奇数被阻塞,因为偶数正在运行。

, @TheodorZoulias 提到的

MergeBounded(1,1) 有效,但我似乎有点太复杂了。

我写了一个更简单的替代方案,但我正在寻找最合适的名称

/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat,but it ignores out of date inner observable sequences.
/// Similar to Exhaust,but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        IObservable<T> latest = default;
        return source
            .Select(inner =>
            {
                latest = inner;
                return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
            })
            .ConcatExhaust();
    });
}

以下测试:

var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
    .Take(5)
    .Do(val => Console.WriteLine($"Source: {val}"));


source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
     .ConcatExhaust()
     .Wait();

返回:

Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...