问题描述
我有一种情况,我正在从套接字连接中按突发接收事件。突然会有很多事件发生,然后一会儿什么都没有(例如大约100毫秒,所以不会那么长)。
当数据到达时,它来自RabbitMQ回调,我需要尽快从该回调返回。 我以前在回调中处理数据,这引起了缓冲问题,后来变成了连接问题。
因此,我想尽可能快地将到达的数据放入队列并从事件中返回,然后让另一个线程也通过事件来获取数据。
由于这是一种需要在多个地方使用的机制,因此我为此创建了一个Type,但是该代码尚未在主系统中经过测试。集成需要花费一些工作,我首先要解决以下问题:
open System
open System.Collections.Concurrent
open System.Threading
type EventThreadDecoupling<'a>() =
// data queue
let queue = ConcurrentQueue<'a>()
// event called when an element is in the queue
let popEvent = Event<'a>()
// wait handle,triggered when data gets put in the queue
let eventHandle = new EventWaitHandle(false,EventResetMode.ManualReset)
// setup the thread that processes the queue
do
async {
while true do
// there is a 1s timeout just in case there is data
// that was added while I reset the eventHandle
eventHandle.WaitOne(TimeSpan.FromSeconds(1.)) |> ignore
let mutable dataRead = false
while not queue.IsEmpty do
match queue.TryDequeue() with
| true,v -> popEvent.Trigger(v)
dataRead <- true
| _,_ -> ()
if dataRead then
eventHandle.Reset() |> ignore
} |> Async.Start
// event called when data has arrived
member this.OnEvent =
popEvent.Publish
// push data to the queue
member this.Push(data: 'a) =
queue.Enqueue(data)
eventHandle.Set() |> ignore
[<EntryPoint>]
let main _ =
let r = Random()
let e = EventThreadDecoupling<DateTime>()
e.OnEvent.Add(fun d ->
printfn "%A: received %A" DateTime.Now d
)
while true do
Thread.Sleep(r.Next(200))
e. Push(DateTime.Now)
0
解决方法
基本上,您需要创建一个使用F# MailboxProcessor 的代理。在RabbitMQ回调中,除了将接收到的消息(数据)转发(发布)到代理程序外,您什么也不做。 MailboxProcessor将为您完成排队,因此您无需在这里重新发明轮子。
代码为:
type Agent<'Msg> (processMsg) =
let inbox = MailboxProcessor<'Msg>.Start <| fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive ()
processMsg msg
return! loop ()
}
loop ()
member this.Post msg =
inbox.Post msg
[<EntryPoint>]
let main argv =
let evt = Event<_> ()
let agent = Agent<DateTime> (fun msg ->
printfn "Processing msg: %A" msg
Thread.Sleep 3000
printfn "Processed msg: %A" msg
)
evt.Publish.Subscribe (fun msg ->
printfn "Received: %A" msg
// forward msg to the agent:
agent.Post msg)
|> ignore
let rec loop () = async {
do! Async.Sleep 500
evt.Trigger DateTime.Now
return! loop ()
}
let cts = new CancellationTokenSource ()
Async.Start (loop (),cts.Token)
Console.ReadKey true |> ignore
cts.Cancel ()
0
如果运行此代码,则无论处理代理中收到的消息有多长时间,都会每500毫秒定期打印一次文本"Received: xxx"
。
F#具有MailboxProcessor
。听起来这很适合解决这个问题。
https://fsharpforfunandprofit.com/posts/concurrency-actor-model/