问题描述
有时业务逻辑似乎能够通过一些递归定义的可观察对象自然建模。下面是一个例子:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId,CommandResult)> RunCommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
对于用户输入的每个命令,它应该在 prcocess 中触发一个正在运行的进程,或者在 skippedCommands 中发出一个值。这个逻辑的一些直接翻译也许
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))
如上代码所示,validCommands
和 processes
的赋值是相互递归的,我们可以等价地定义 processes
直接递归使用自身
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => RunCommand(c))
但是我们不能像这样在 C# 中定义 prcesses
Observable。
我发现了几个可能的相关事物:
-
Observable.Generate
构造函数。然而,它似乎以同步方式折叠自己的状态,我不知道如何在userCommands
中使用RunCommand
observable 和Observable.Generate
; -
RxJS 中的一些运算符如
exhaust
或exhaustMap
,而 Rx.Net 没有提供这个运算符,有一些 3rd-party 库提供了这些运算符,例如 {{3} }. 实现就像
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter Disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
但是,有两个问题。 一种。直接使用此运算符不符合上述要求,跳过的命令将被静默忽略。我们可以稍微修改一下源代码以满足要求,但还有一个问题 湾该实现引入了两个本地可变变量和两个嵌套订阅。我不知道这是否在所有情况下都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方案
-
FSharp.Control.Reactive 提供了前向引用类型
的解决方案StreamLoop
和CellLoop
。根据 Functional Reactive Programming 一书,这些前向引用类型的 Rx 替代方案是Subject
,通过使用Subject
,上面的递归构造分为两个阶段。问题是 Intro to Rx 指出,使用Subject
需要手动管理更多状态,至少需要处理主题,并且可能被迫热观察。我想知道是否存在不使用Subject
-
在
window
结果上使用RunCommand
运算符与最后一个值的边界(就在完成之前),上面的processes
可以有一些构造,但是这个解决方案需要使用结束信号两次,这需要仔细处理(尝试和调整Take(1)
、zip
、withLatestFrom
、combineLatest
、Window
运算符的重载以获取期望的结果)同时发生的事件。
有没有更好的解决方案或对上述解决方案进行修改,尤其是仅使用运算符?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)