如何在 Rx.Net 中使用 iteself 递归地执行打结/定义可观察对象?

问题描述

有时业务逻辑似乎能够通过一些递归定义的可观察对象自然建模。下面是一个例子:

interface Demo {
    IObservable<CommandId> userCommands;
    IObservable<IObservable<IProcessingState>> processes;
    IObservable<CommandId> skippedCommands;
    IObservable<(CommandId,CommandResult)> runcommand(CommandId id);
}

interface IProcessingState {
    bool IsProcessing {get;}
    CommandId? ProcessingId {get;}
}

对于用户输入的每个命令,它应该在 prcocess 中触发一个正在运行的进程,或者在 skippedCommands 中发出一个值。这个逻辑的一些直接翻译也许

var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => runcommand(c))

如上代码所示,validCommandsprocesses 的赋值是相互递归的,我们可以等价地定义 processes 直接递归使用自身

var processes = userCommands.WithLatestFrom(processes)
                            .Where(x => !x.Item2.IsProcessing)
                            .Select(c => runcommand(c))

但是我们不能像这样在 C# 中定义 prcesses Observable。

我发现了几个可能的相关事物:

  1. Observable.Generate 构造函数。然而,它似乎以同步方式折叠自己的状态,我不知道如何在 userCommands 中使用 runcommand observable 和 Observable.Generate;

  2. RxJS 中的一些运算符如 exhaustexhaustMap,而 Rx.Net 没有提供这个运算符,有一些 3rd-party 库提供了这些运算符,例如 {{3} }. 实现就像

let exhaustMap f source =
        Observable.Create (fun (o : IObserver<_>) -> 
            let mutable hasSubscription = false
            let mutable innerSub = None
            let onInnerCompleted () =
                hasSubscription <- false
                innerSub |> Option.iter disposable.dispose
            let onOuterNext x =
                if not hasSubscription then
                    hasSubscription <- true 
                    f x |> subscribeSafeWithCallbacks 
                            o.OnNext o.OnError onInnerCompleted
                        |> fun y -> innerSub <- Some y
            source
            |> subscribeSafeWithCallbacks
                onOuterNext o.OnError o.OnCompleted)

但是,有两个问题。 一种。直接使用此运算符不符合上述要求,跳过的命令将被静忽略。我们可以稍微修改一下源代码以满足要求,但还有一个问题 湾该实现引入了两个本地可变变量和两个嵌套订阅。我不知道这是否在所有情况下都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方

  1. FSharp.Control.Reactive 提供了前向引用类型 StreamLoopCellLoop。根据 Functional Reactive Programming 一书,这些前向引用类型的 Rx 替代方案是 Subject,通过使用 Subject,上面的递归构造分为两个阶段。问题是 Intro to Rx 指出,使用 Subject 需要手动管理更多状态,至少需要处理主题,并且可能被迫热观察。我想知道是否存在不使用 Subject

    解决方
  2. window 结果上使用 runcommand 运算符与最后一个值的边界(就在完成之前),上面的 processes 可以有一些构造,但是这个解决方案需要使用结束信号两次,这需要仔细处理(尝试和调整 Take(1)zipwithLatestFromcombineLatestWindow 运算符的重载以获取期望的结果)同时发生的事件。

有没有更好的解决方案或对上述解决方案进行修改,尤其是仅使用运算符?

解决方法

你的类型都很奇怪而且很难处理。您的问题基本上是一个带有两个触发器的简单状态机:1) 新命令到达,2) 上一个命令执行完毕。

这应该会让你开始:

void Main()
{
    IObservable<ICommand> source = new Subject<ICommand>();
    var executionTerminatedLoopback = new Subject<Unit>();
    
    var stateMachine = source
        .Select(c => (command: c,type: 1))
        .Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null,type: 2)))
        .Scan((isExecuting: false,validCommand: (ICommand)null,failedCommand: (ICommand)null),(state,msg) => {
            if(msg.type == 2)
                return (false,null,null);
            if(state.isExecuting)
                return (true,msg.command);
            else
                return (true,msg.command,null);
        });
    
    var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
    var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
    
    validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}

public interface ICommand{ 
    IObservable<Unit> Execute();
}

source 此处不必是主题,也可能不应该是:它可以是任何 IObservable<ICommand>。在答案中模拟更容易。 exeuctionTerminatedLoopback 确实必须如此,以(如您所说)将递归分解为两部分。因为是主题,所以应该保密,不要泄露。

我认为不使用 Subject 在 C# 中没有任何答案。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...