问题描述
有时业务逻辑似乎能够通过一些递归定义的可观察对象自然建模。下面是一个例子:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId,CommandResult)> runcommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
对于用户输入的每个命令,它应该在 prcocess 中触发一个正在运行的进程,或者在 skippedCommands 中发出一个值。这个逻辑的一些直接翻译也许
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => runcommand(c))
如上代码所示,validCommands
和 processes
的赋值是相互递归的,我们可以等价地定义 processes
直接递归使用自身
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => runcommand(c))
但是我们不能像这样在 C# 中定义 prcesses
Observable。
我发现了几个可能的相关事物:
-
Observable.Generate
构造函数。然而,它似乎以同步方式折叠自己的状态,我不知道如何在userCommands
中使用runcommand
observable 和Observable.Generate
; -
RxJS 中的一些运算符如
exhaust
或exhaustMap
,而 Rx.Net 没有提供这个运算符,有一些 3rd-party 库提供了这些运算符,例如 {{3} }. 实现就像
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
但是,有两个问题。 一种。直接使用此运算符不符合上述要求,跳过的命令将被静默忽略。我们可以稍微修改一下源代码以满足要求,但还有一个问题 湾该实现引入了两个本地可变变量和两个嵌套订阅。我不知道这是否在所有情况下都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方案
-
FSharp.Control.Reactive 提供了前向引用类型
的解决方案StreamLoop
和CellLoop
。根据 Functional Reactive Programming 一书,这些前向引用类型的 Rx 替代方案是Subject
,通过使用Subject
,上面的递归构造分为两个阶段。问题是 Intro to Rx 指出,使用Subject
需要手动管理更多状态,至少需要处理主题,并且可能被迫热观察。我想知道是否存在不使用Subject
-
在
window
结果上使用runcommand
运算符与最后一个值的边界(就在完成之前),上面的processes
可以有一些构造,但是这个解决方案需要使用结束信号两次,这需要仔细处理(尝试和调整Take(1)
、zip
、withLatestFrom
、combineLatest
、Window
运算符的重载以获取期望的结果)同时发生的事件。
有没有更好的解决方案或对上述解决方案进行修改,尤其是仅使用运算符?
解决方法
你的类型都很奇怪而且很难处理。您的问题基本上是一个带有两个触发器的简单状态机:1) 新命令到达,2) 上一个命令执行完毕。
这应该会让你开始:
void Main()
{
IObservable<ICommand> source = new Subject<ICommand>();
var executionTerminatedLoopback = new Subject<Unit>();
var stateMachine = source
.Select(c => (command: c,type: 1))
.Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null,type: 2)))
.Scan((isExecuting: false,validCommand: (ICommand)null,failedCommand: (ICommand)null),(state,msg) => {
if(msg.type == 2)
return (false,null,null);
if(state.isExecuting)
return (true,msg.command);
else
return (true,msg.command,null);
});
var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}
public interface ICommand{
IObservable<Unit> Execute();
}
source
此处不必是主题,也可能不应该是:它可以是任何 IObservable<ICommand>
。在答案中模拟更容易。 exeuctionTerminatedLoopback
确实必须如此,以(如您所说)将递归分解为两部分。因为是主题,所以应该保密,不要泄露。
我认为不使用 Subject 在 C# 中没有任何答案。