如何在F#中尽快释放事件

问题描述

我有一种情况,我正在从套接字连接中按突发接收事件。突然会有很多事件发生,然后一会儿什么都没有(例如大约100毫秒,所以不会那么长)。

当数据到达时,它来自RabbitMQ回调,我需要尽快从该回调返回。 我以前在回调中处理数据,这引起了缓冲问题,后来变成了连接问题。

因此,我想尽可能快地将到达的数据放入队列并从事件中返回,然后让另一个线程也通过事件来获取数据。

由于这是一种需要在多个地方使用的机制,因此我为此创建了一个Type,但是该代码尚未在主系统中经过测试。集成需要花费一些工作,我首先要解决以下问题:

  • .NET中是否有任何现有机制可以实现这一目标。
  • 代码是否有意义(在某种程度上,这可能是有关代码审阅的问题,而不是纯粹的问题,但上面的问题值得在此处而不是代码审阅网站上发布)

代码及其测试在这里

open System
open System.Collections.Concurrent
open System.Threading


type EventThreadDecoupling<'a>() =

    // data queue
    let queue = ConcurrentQueue<'a>()

    // event called when an element is in the queue
    let popEvent = Event<'a>()

    // wait handle,triggered when data gets put in the queue
    let eventHandle = new EventWaitHandle(false,EventResetMode.ManualReset)

    // setup the thread that processes the queue
    do
        async {
            while true do
                // there is a 1s timeout just in case there is data
                // that was added while I reset the eventHandle
                eventHandle.WaitOne(TimeSpan.FromSeconds(1.)) |> ignore
                let mutable dataRead = false
                while not queue.IsEmpty do
                    match queue.TryDequeue() with
                    | true,v -> popEvent.Trigger(v)
                                 dataRead <- true
                    | _,_    -> ()

                if dataRead then
                    eventHandle.Reset() |> ignore
        } |> Async.Start

    // event called when data has arrived
    member this.OnEvent =
        popEvent.Publish

    // push data to the queue
    member this.Push(data: 'a) =
        queue.Enqueue(data)
        eventHandle.Set() |> ignore


[<EntryPoint>]
let main _ =

    let r = Random()

    let e = EventThreadDecoupling<DateTime>()

    e.OnEvent.Add(fun d ->
        printfn "%A: received %A" DateTime.Now d
    )

    while true do
        Thread.Sleep(r.Next(200))
        e. Push(DateTime.Now)

    0

解决方法

基本上,您需要创建一个使用F# MailboxProcessor 的代理。在RabbitMQ回调中,除了将接收到的消息(数据)转发(发布)到代理程序外,您什么也不做。 MailboxProcessor将为您完成排队,因此您无需在这里重新发明轮子。

代码为:

type Agent<'Msg> (processMsg) =
    let inbox = MailboxProcessor<'Msg>.Start <| fun inbox ->
        let rec loop () = async {
            let! msg = inbox.Receive ()
            processMsg msg
            return! loop ()
        }
        loop ()
    member this.Post msg =
        inbox.Post msg

[<EntryPoint>]
let main argv =    
    let evt = Event<_> ()
    let agent = Agent<DateTime> (fun msg ->
        printfn "Processing msg: %A" msg
        Thread.Sleep 3000
        printfn "Processed msg: %A" msg
    )

    evt.Publish.Subscribe (fun msg ->
        printfn "Received: %A" msg
        // forward msg to the agent:
        agent.Post msg)
    |> ignore

    let rec loop () = async {
        do! Async.Sleep 500
        evt.Trigger DateTime.Now
        return! loop ()
    }
    let cts = new CancellationTokenSource ()
    Async.Start (loop (),cts.Token)
    Console.ReadKey true |> ignore
    cts.Cancel ()

    0

如果运行此代码,则无论处理代理中收到的消息有多长时间,都会每500毫秒定期打印一次文本"Received: xxx"

,

F#具有MailboxProcessor。听起来这很适合解决这个问题。

https://fsharpforfunandprofit.com/posts/concurrency-actor-model/