问题描述
目前,我有一个函数可以从外部接收原始数据,对其进行处理,然后将其发送给回调:
let Process callback rawData =
let data = rawData //transforming into actual data....
callback data
回调是一个函数'T -> unit
。就我而言,它是 MailBoxProcessor 的 Post
函数(被称为 Process mailBox.Post rawData
)
process 函数被多次调用,每次我把处理过的数据推送到邮箱队列中。到目前为止一切顺利。
现在我想更改此代码,使用 FSharp 的 rx 扩展 (FSharp.Control.Reactive) 将处理后的数据发布给各种消费者。这意味着 callback
要么是一个 Observable,要么是一个发布给订阅者的函数。我该怎么做?
我找到了两个选项:
-
使用
Subject.behavior
。这正是我想要的,除了它需要一个初始状态,在这种情况下这在语义上没有意义,并且显然主题不受欢迎(来自 ReactiveX 站点 http://davesexton.com/blog/post/To-Use-Subject-Or-Not-To-Use-Subject.aspx 中的链接)。
从函数式编程的角度来看,什么是更好的方法?有没有更好的办法?
解决方法
这里有一个想法:您可以使用对象表达式来实现 IObservable<_>
,而无需显式类的开销:
let createObservable subscribe =
{
new IObservable<_> with
member __.Subscribe(observer) =
subscribe observer
}
要使用它,请指定类型为 subscribe
的 IObserver<_> -> IDisposable
函数。无需上课。
使用 observe { .. }
计算构建器可以工作,但 FSharp.Control.Reactive
库中有一个函数可以做同样的事情:
open FSharp.Control.Reactive
let obs = Observable.ofSeq [1;2;3;4;5]
如果我使用 observe { .. }
计算构建器,我还会使用它支持 for
循环的事实,这使您的代码更简单:
let Process initialData = observe {
for x in initialData do yield x }
,
明白了。 Fsharp 反应式提供来自模块 observe
的关键字 FSharp.Control.Reactive.Builders
。这允许您创建临时可观察对象:
open FSharp.Control.Reactive.Builders
//In my real case,"initialData" is a byte stream and
//at each step I read a few bytes off of it
let Process initialData =
let rec loop data =
observe {
match data with
| x :: xs ->
yield x
yield! loop xs
| [] -> ()
}
loop initialData
let obs = Process ([1;2;3;4;5])
obs.Subscribe(fun d -> printfn "Consumer A: %A" d) |> ignore
obs.Subscribe(fun d -> printfn "Consumer B: %A" d) |> ignore
Threading.Thread.Sleep 1000
obs.Subscribe(fun d -> printfn "Late consumer: %A" d) |> ignore
重要的是要注意,这会创建一个 cold observable,因此 Late 消费者接收所有事件。