问题描述
Rx.Net中的Scan
运算符具有签名:
public static IObservable<TAccumulate> Scan<TSource,TAccumulate>(this IObservable<TSource> source,TAccumulate seed,Func<TAccumulate,TSource,TAccumulate> accumulator);
累加器是
Func<TAccumulate,TAccumulate> accumulator
在尝试通过异步状态转换实现状态机模型时,我发现具有以下签名的ScanAsync
运算符会有所帮助。
public static IObservable<TAccumulate> Scan<TSource,Task<TAccumulate>> accumulator);
累加器具有签名
Func<TAccumulate,Task<TAccumulate>> accumulator
理想的应用程序代码应该是这样的(类似于普通的Scan
运算符,区别在于使用异步累加器)。
IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
initialState,async (prevIoUsstate,evt) => {
var newState = await transitionAsync(prevIoUsstate,evt);
return newState;
});
似乎MS正在开发AsyncRx.NET,但是它是not released yet(no schedule)。
相关内容:
如果通过BehavIoUrSubject
为状态建模异步状态机,并观察到可订阅的事件,如下面的代码
IObservable<TEvent> events;
BehavIoUrSubject<State> states = new BehavIoUrSubject<State>(initialState);
events.Subscribe(async e => {
var newState = await transition(states.Value,e);
states.OnNext(newState);
})
我想在某些情况下可能会有比赛条件。
我尝试用
实现它IObservable<TS> ScanAsync<TS,TE>(
IObservable<TE> source,Func<TS,TE,Task<TS>> reducer,TS initialState)
{
var states = from m in source.Take(1)
from nextState in reducer(initialState,m).ToObservable()
from s in ScanAsync(source.Skip(1),reducer,nextState)
select s;
return Observable.Return(initialState).Concat(states);
}
但是有时它会起作用,有时它只是被阻塞了,我不知道是什么原因造成的。
解决方法
您可以使用Scan
运算符来创建中间IObservable<Task<TAccumulate>>
,然后可以使用Concat
运算符将其展平:
public static IObservable<TAccumulate> Scan<TSource,TAccumulate>(
this IObservable<TSource> source,TAccumulate seed,Func<TAccumulate,TSource,Task<TAccumulate>> accumulator)
{
return source.Scan(Task.FromResult(seed),async (previousTask,item) =>
{
return await accumulator(await previousTask,item);
}).Concat();
}
上面的实现使用Concat
重载来接受任务的可观察对象,而不是嵌套的可观察对象:
// Concatenates all task results,as long as the previous task terminated successfully.
public static IObservable<TSource> Concat<TSource>(
this IObservable<Task<TSource>> sources);