如何使用 F# 的 rx 扩展从简单值创建 Observable?

问题描述

目前,我有一个函数可以从外部接收原始数据,对其进行处理,然后将其发送给回调:

let Process callback rawData =
    let data = rawData //transforming into actual data....
    callback data

回调是一个函数'T -> unit。就我而言,它是 MailBoxProcessor 的 Post 函数(被称为 Process mailBox.Post rawData) process 函数被多次调用,每次我把处理过的数据推送到邮箱队列中。到目前为止一切顺利。

现在我想更改此代码,使用 FSharp 的 rx 扩展 (FSharp.Control.Reactive) 将处理后的数据发布给各种消费者。这意味着 callback 要么是一个 Observable,要么是一个发布给订阅者的函数。我该怎么做?

我找到了两个选项:

  1. 创建一个实现 IObservable 的类,并将该对象传递给 Process 函数。如果可能,我想避免创建类。

  2. 使用 Subject.behavior。这正是我想要的,除了它需要一个初始状态,在这种情况下这在语义上没有意义,并且显然主题不受欢迎(来自 ReactiveX 站点 http://davesexton.com/blog/post/To-Use-Subject-Or-Not-To-Use-Subject.aspx 中的链接)。

函数式编程的角度来看,什么是更好的方法?有没有更好的办法?

解决方法

这里有一个想法:您可以使用对象表达式来实现 IObservable<_>,而无需显式类的开销:

let createObservable subscribe =
    {
        new IObservable<_> with
            member __.Subscribe(observer) =
                subscribe observer
    }

要使用它,请指定类型为 subscribeIObserver<_> -> IDisposable 函数。无需上课。

,

使用 observe { .. } 计算构建器可以工作,但 FSharp.Control.Reactive 库中有一个函数可以做同样的事情:

open FSharp.Control.Reactive
let obs = Observable.ofSeq [1;2;3;4;5]

如果我使用 observe { .. } 计算构建器,我还会使用它支持 for 循环的事实,这使您的代码更简单:

let Process initialData = observe {
  for x in initialData do yield x }
,

明白了。 Fsharp 反应式提供来自模块 observe 的关键字 FSharp.Control.Reactive.Builders。这允许您创建临时可观察对象:

open FSharp.Control.Reactive.Builders

//In my real case,"initialData" is a byte stream and 
//at each step I read a few bytes off of it
let Process initialData =
    let rec loop data =
        observe {
            match data with
            | x :: xs ->
                yield x
                yield! loop xs
            | [] -> ()
        }
    loop initialData

let obs = Process ([1;2;3;4;5])
obs.Subscribe(fun d -> printfn "Consumer A: %A" d) |> ignore
obs.Subscribe(fun d -> printfn "Consumer B: %A" d) |> ignore
Threading.Thread.Sleep 1000
obs.Subscribe(fun d -> printfn "Late consumer: %A" d) |> ignore

重要的是要注意,这会创建一个 cold observable,因此 Late 消费者接收所有事件。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...