如何在Rx.Net中使用异步累加器实现ScanAsync运算符?

问题描述

Rx.Net中的Scan运算符具有签名:

public static IObservable<TAccumulate> Scan<TSource,TAccumulate>(this IObservable<TSource> source,TAccumulate seed,Func<TAccumulate,TSource,TAccumulate> accumulator);

累加器是

Func<TAccumulate,TAccumulate> accumulator

在尝试通过异步状态转换实现状态机模型时,我发现具有以下签名的ScanAsync运算符会有所帮助。

public static IObservable<TAccumulate> Scan<TSource,Task<TAccumulate>> accumulator);

累加器具有签名

Func<TAccumulate,Task<TAccumulate>> accumulator

理想的应用程序代码应该是这样的(类似于普通的Scan运算符,区别在于使用异步累加器)。

IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
    initialState,async (prevIoUsstate,evt) => {
        var newState = await transitionAsync(prevIoUsstate,evt);
        return newState;
    });

似乎MS正在开发AsyncRx.NET,但是它是not released yet(no schedule)


相关内容

如果通过BehavIoUrSubject为状态建模异步状态机,并观察到可订阅的事件,如下面的代码

IObservable<TEvent> events;
BehavIoUrSubject<State> states = new BehavIoUrSubject<State>(initialState);
events.Subscribe(async e => {
    var newState = await transition(states.Value,e);
    states.OnNext(newState);
})

我想在某些情况下可能会有比赛条件。

我尝试用

实现它
IObservable<TS> ScanAsync<TS,TE>(
IObservable<TE> source,Func<TS,TE,Task<TS>> reducer,TS initialState)
{
    var states = from m in source.Take(1)
                    from nextState in reducer(initialState,m).ToObservable()
                    from s in ScanAsync(source.Skip(1),reducer,nextState)
                    select s;
    return Observable.Return(initialState).Concat(states);
}

但是有时它会起作用,有时它只是被阻塞了,我不知道是什么原因造成的。

解决方法

您可以使用Scan运算符来创建中间IObservable<Task<TAccumulate>>,然后可以使用Concat运算符将其展平:

public static IObservable<TAccumulate> Scan<TSource,TAccumulate>(
    this IObservable<TSource> source,TAccumulate seed,Func<TAccumulate,TSource,Task<TAccumulate>> accumulator)
{
    return source.Scan(Task.FromResult(seed),async (previousTask,item) =>
    {
        return await accumulator(await previousTask,item);
    }).Concat();
}

上面的实现使用Concat重载来接受任务的可观察对象,而不是嵌套的可观察对象:

// Concatenates all task results,as long as the previous task terminated successfully.
public static IObservable<TSource> Concat<TSource>(
    this IObservable<Task<TSource>> sources);

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...