问题描述
假设我有一个 MailBoxProcessor
,它接收 AsyncReplyChannel
条消息并异步完成它们。
是否有一种简单的方法可以从这样的 IObservable
构建 MailBoxProcessor
?
let actor = MailBoxProcessor.Start (fun inBox ->
async {
let mutable x = 0
while true do
let! ch : AsyncReplyChannel<int> = inBox.Receive ()
ch.Reply x
x <- x + 1
})
let obs : IObservable<int> = Observable.ofActor actor
actor.PostAndReply (fun ch -> ch) // Fires obs too
我想有一些决定要围绕热/冷 observables 等做出。
此处涉及:Does MailboxProcessor just duplicate IObservable?
解决方法
如果您想获得一个现有的 MailboxProcessor
并获得一个 IObservable
,只要邮箱处理器响应消息就会被触发,那么我认为没有办法做到这一点 - 有没有钩子可以让您检测到这一点。
这样做的方法是在 MailboxProcessor
上定义某种包装器。例如,您可以定义 NotifyingMailboxProcessor<'Msg,'Evt>
,它在每次处理 'Evt
类型的消息时触发 'Msg
类型的事件。你可以从这样的事情开始:
type NotifyingMailboxProcessor<'Msg,'Evt>(mbox:MailboxProcessor<'T>) =
let evt = Event<'E>()
member x.OnPostAndReply = evt.Publish
member x.PostAndReply<'R>(f:AsyncReplyChannel<'R> -> 'T,g:'T -> 'R -> 'E) =
let mutable msg = Unchecked.defaultof<_>
let res = mbox.PostAndReply(fun ch -> msg <- f(ch); msg)
evt.Trigger(g msg res)
res
type NotifyingMailboxProcessor =
static member Start<'Msg,'Evt>(f) =
NotifyingMailboxProcessor<'Msg,'Evt>(MailboxProcessor.Start(f))
这模拟了标准界面,因此您可以使用 NotifyingMailboxProcessor.Start
创建一个。一个微妙的问题是,要包装 PostAndReply
,您需要给它另一个函数,该函数从发送到邮箱的消息和回复(这些可以有不同的类型,因此,您必须将结果包装到类型 'Evt
中,就像您必须将多种消息类型包装到单个可区分联合中一样,通常)。
一个受你激励的例子是:
'Evt