问题描述
我想使用以下策略防止并发调用 LoadAsync:
我几乎可以肯定这个策略有一个名字,但我想不起来了。
我目前使用此 switchmap 等效项,但它取消了 LoadAsync 方法以支持新调用。
Observable<int> sourceObservable..
sourceObservable
.Select(value => Observable.FromAsync(() => LoadAsync(value)))
.Switch()
async Task<int> LoadAsync(int value)
{
await Task.Delay(1000);
return value;
}
在javascript中它会是:
source$.pipe(switchMap(Load))
function load(id): Observable<int> {
return delay(1000).pipe(map(() => id));
}
编辑:我想知道为什么没有人要求这种扁平化策略。我认为这是很常见的要求。
例如,这就是 CI/CD 管道通常的工作方式。如果有太多推送到某个分支而您没有能力构建所有这些,但您想确保构建最新的提交
解决方法
换句话说,你想从 2nd-level observable 中取出一个 child observable,获取所有结果并等待它完成,然后重复。
这将在 C# 中工作:
public static IObservable<T> SingleSwitch<T,U>(this IObservable<U> source,Func<U,IObservable<T>> selector)
{
return source.Publish(_source => _source
.Take(1)
.SelectMany(e => selector(e))
.Repeat()
);
}
请注意,您的 Load
中存在错误。您需要将 await
添加到延迟正常工作中。
示例代码:
Observable.Interval(TimeSpan.FromSeconds(.75))
.SingleSwitch(e => Observable.FromAsync (async () => {
await Task.Delay(1000);
return e;
}))
.Dump();
输出是偶数,奇数被阻塞,因为偶数正在运行。
, @TheodorZoulias 提到的MergeBounded(1,1) 有效,但我似乎有点太复杂了。
我写了一个更简单的替代方案,但我正在寻找最合适的名称
/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat,but it ignores out of date inner observable sequences.
/// Similar to Exhaust,but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
IObservable<T> latest = default;
return source
.Select(inner =>
{
latest = inner;
return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
})
.ConcatExhaust();
});
}
以下测试:
var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
.Take(5)
.Do(val => Console.WriteLine($"Source: {val}"));
source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
.ConcatExhaust()
.Wait();
返回:
Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4