问题描述
我有以下 Rx 方法链,我希望清理/改进下面显示的 Select
方法,是否可以使用现有的 Rx 运算符完成此操作,尝试使用 Amb
但查询所有 ISIN 和返回产生的第一个价格。
我希望为 ISIN 生成第一个价格,而不需要查询任何其他 ISIN,换句话说,我想以串行方式调用 GetPrice - 尝试第一个 ISIN,尝试第二个 ISIN,尝试第三个 ISIN 等...
public class Class1
{
public Class1()
{
GetIsins()
.Select(isins =>
{
*decimal? price = null;
foreach (var isin in isins)
{
price = GetPrice(isin)
.Take(1)
.Wait();
if (price.HasValue)
break;
}
return price;*
})
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
}
public IObservable<string[]> GetIsins()
{
return Observable.Return(new []
{
"US9311421039","TR9311421033","UK3342130394"
});
}
public IObservable<decimal?> GetPrice(string isin)
{
return Observable.Return((decimal?)100m);
}
}
解决方法
假设我已经正确理解了这个问题,您可能需要使 GetPrice
操作成为订阅 observable 的副作用。
GetIsins()
.SelectMany(isins => isins)
.Select(isin => Observable.Defer(() =>
{
return GetPrice(isin);
}))
.Concat() // or .Merge(1)
.Subscribe(price =>
{
if (price.HasValue)
Debug.WriteLine("Got a price...");
});
Concat
运算符确保序列中的每个 IObservable<decimal?>
在订阅下一个之前都必须完成。Defer
运算符确保每个 IObservable<decimal?>
在订阅之前不会启动(确保它是 cold 可观察的)。