在 F# 中将 MailboxProcessor 包装到 IObservable 中的好方法是什么?

问题描述

假设我有一个 MailBoxProcessor,它接收 AsyncReplyChannel 条消息并异步完成它们。

是否有一种简单的方法可以从这样的 IObservable 构建 MailBoxProcessor

let actor = MailBoxProcessor.Start (fun inBox ->
  async {
    let mutable x = 0

    while true do
      let! ch : AsyncReplyChannel<int> = inBox.Receive ()

      ch.Reply x

      x <- x + 1
  })

let obs : IObservable<int> = Observable.ofActor actor 

actor.PostAndReply (fun ch -> ch) // Fires obs too

我想有一些决定要围绕热/冷 observables 等做出。

此处涉及:Does MailboxProcessor just duplicate IObservable?

解决方法

如果您想获得一个现有的 MailboxProcessor 并获得一个 IObservable,只要邮箱处理器响应消息就会被触发,那么我认为没有办法做到这一点 - 有没有钩子可以让您检测到这一点。

这样做的方法是在 MailboxProcessor 上定义某种包装器。例如,您可以定义 NotifyingMailboxProcessor<'Msg,'Evt>,它在每次处理 'Evt 类型的消息时触发 'Msg 类型的事件。你可以从这样的事情开始:

type NotifyingMailboxProcessor<'Msg,'Evt>(mbox:MailboxProcessor<'T>) = 
  let evt = Event<'E>()
  member x.OnPostAndReply = evt.Publish
  member x.PostAndReply<'R>(f:AsyncReplyChannel<'R> -> 'T,g:'T -> 'R -> 'E) = 
    let mutable msg = Unchecked.defaultof<_>
    let res = mbox.PostAndReply(fun ch -> msg <- f(ch); msg)
    evt.Trigger(g msg res)
    res

type NotifyingMailboxProcessor =
  static member Start<'Msg,'Evt>(f) = 
    NotifyingMailboxProcessor<'Msg,'Evt>(MailboxProcessor.Start(f))

这模拟了标准界面,因此您可以使用 NotifyingMailboxProcessor.Start 创建一个。一个微妙的问题是,要包装 PostAndReply,您需要给它另一个函数,该函数从发送到邮箱的消息和回复(这些可以有不同的类型,因此,您必须将结果包装到类型 'Evt 中,就像您必须将多种消息类型包装到单个可区分联合中一样,通常)。

一个受你激励的例子是:

'Evt

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...