此代码的某些部分是否会导致 NetMQ 抛出 SocketException?

问题描述

我为分布式应用程序创建了一个基于 NetMQ 的简单通信系统。我有一个 Hub,多个 Node 可以通过它传递消息,还有一个 Bus,多个 BusNode 可以用于发布/订阅。

我遇到了一个很少出现的问题,我希望我今天已经解决了。应用程序每周只会崩溃几次,并且在 Windows 事件日志中发布了一个异常。我显然无法弄清楚我的来源的哪一部分导致了这种情况。到目前为止,我还无法在我的开发机器上重现该问题。抱歉,我只有这个截图,没有文字。

Crash log

我认为解决方案是使用 NetMQQueue,因为事实证明消息是从多个线程发送的,而我认为不是。所以我在 NodeBusNode 中实现了 NetMQQueue 的使用,这很简单明了。

但是,还有两个地方我不确定东西是否是线程安全的,或者我需要修复一些东西。即在 HubBus 中。这就是两人的样子。

我的问题是:这两个是理智的,还是线程问题在这里也是一个危险?

中心

type Hub(connectionStrings: string list) =
    let poller = new NetMQPoller()
    let router = new RouterSocket()
    do
        router.ReceiveReady.Add (fun x ->
            let message = router.ReceiveMultipartMessage()
            if message.FrameCount = 4 then
                // incoming is 0:source 1:destination 2:timestamp 3:msg
                let m = NetMQMessage()
                m.Append message.[1] // destination (for routing - will be removed by router)
                m.Append message.[0] // source
                m.Append message.[1] // destination
                m.Append message.[2] // timestamp
                m.Append message.[3] // msg
                router.SendMultipartMessage m
            else
                log.Warning "Unexpected frame size in Hub."
        )
        connectionStrings |> List.iter router.Bind
        poller.Add router
        poller.RunAsync()
    member _.Stop() =
        poller.Stop()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose router
                dispose poller

公交车

type Bus(xpubs: string list,xsubs: string list) =
    let forward (inSocket: NetMQSocket) (outSocket: NetMQSocket) =
        let mutable more = inSocket.HasIn
        while more do
            let mutable msg = new Msg()
            msg.InitEmpty()
            inSocket.Receive &msg
            more <- msg.HasMore
            outSocket.Send (&msg,more)
    let poller = new NetMQPoller()
    let xpub = new XPublisherSocket()
    let xsub = new XSubscriberSocket()
    do
        xpubs |> List.iter xpub.Bind
        xsubs |> List.iter xsub.Bind
        xpub.ReceiveReady.Add (fun x -> forward xpub xsub)
        xsub.ReceiveReady.Add (fun x -> forward xsub xpub)
        poller.Add xpub
        poller.Add xsub
        poller.RunAsync()
    member _.Stop() =
        poller.Stop()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose xpub
                poller.RemoveAndDispose xsub
                dispose poller

为了完整起见,我还向您展示了 Node 和 BusNode,它们现在使用 NetMQQueue,我认为至少在这里可以解决问题。

节点

type Node(nodeId: NodeId,connectionString: string) =
    let receiveEvent = new Event<NodeReceiveEventArgs>()
    let queue = new NetMQQueue<NetMQMessage>(0)
    let dealer = new DealerSocket()
    let poller = new NetMQPoller()
    do
        queue.ReceiveReady.Add (fun x ->
            let mutable isDone = false
            while not isDone do
                let mutable m: NetMQMessage = null
                if x.Queue.TryDequeue(&m,TimeSpan.Zero) then
                    dealer.SendMultipartMessage m
                else
                    isDone <- true
        )
        dealer.Options.Identity <- NodeId.crack nodeId
        dealer.ReceiveReady.Add (fun x ->
            let m = dealer.ReceiveMultipartMessage()
            match messageToArgs m with
            | Ok args ->
                // log.Debug $"Rcv ({args.Source.asText} → {nodeId.asText}) {Misc.getDuCaseName args.Message}"
                receiveEvent.Trigger args
            | Error s -> log.Warning $"Rcv : {s}"
        )
        dealer.Connect connectionString
        poller.Add queue
        poller.Add dealer
        poller.RunAsync()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose dealer
                poller.RemoveAndDispose queue
                dispose poller
    member _.OnReceive = receiveEvent.Publish
    member _.Send(destination: NodeId,message: IMessage) =
        // log.Debug $"Snd ({nodeId.asText} → {destination.asText}) {Misc.getDuCaseName message}"
        let m = createMqMessage destination DateTime.Now message
        queue.Enqueue m

总线节点

type BusNode(nodeId: NodeId,xpub: string,xsub: string) =
    let receiveEvent = new Event<BusNodeReceiveEventArgs>()
    let queue = new NetMQQueue<NetMQMessage>(0)
    let pub = new PublisherSocket()
    let sub = new SubscriberSocket()
    let poller = new NetMQPoller()
    do
        queue.ReceiveReady.Add (fun x ->
            let mutable isDone = false
            while not isDone do
                let mutable m: NetMQMessage = null
                if x.Queue.TryDequeue(&m,TimeSpan.Zero) then
                    pub.SendMultipartMessage m
                else
                    isDone <- true
        )
        pub.Options.Identity <- NodeId.crack nodeId
        pub.Connect xsub
        sub.ReceiveReady.Add (fun x ->
            let m = sub.ReceiveMultipartMessage()
            if m.FrameCount = 4 then
                let topic = m.[0].ConvertToString(Text.Encoding.ASCII) |> Topic.create
                let source: NodeId = m.[1].ToByteArray() |> NodeId.pack
                let timeStamp: DateTime = m.[2].ToByteArray() |> ZeroSerialization.deserializeDateTime
                let broadcast = m.[3].ToByteArray() |> ZeroSerialization.deserializeBroadcast
                let args = BusNodeReceiveEventArgs(topic,source,nodeId,timeStamp,broadcast)
                log.Debug $"Bus ({args.Source.asText} → {args.Destination.asText}) [{topic.asText}] {Misc.getDuCaseName broadcast}"
                receiveEvent.Trigger args
            else
                log.Warning $"Bus ({nodeId.asText}) : Unexpected frame size."
        )
        sub.Connect xpub
        poller.Add queue
        poller.Add sub
        poller.RunAsync()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose sub
                poller.RemoveAndDispose pub
                poller.RemoveAndDispose queue
                dispose poller
            ()
    interface IBusNode with
        member _.OnReceive = receiveEvent.Publish
        member _.Publish(topic: Topic,broadcast: IBroadcast) =
            let m = NetMQMessage()
            m.Append (Topic.crack topic)
            m.Append (NodeId.crack nodeId)
            m.Append (ZeroSerialization.serializeDateTime DateTime.Now)
            m.Append (ZeroSerialization.serializeBroadcast broadcast)
            log.Debug $"Bus ({nodeId.asText} →) {Misc.getDuCaseName broadcast}"
            queue.Enqueue m
            ()
        member _.Subscribe (topic: Topic) =
            log.Debug $"Bus ({nodeId.asText}) subscribe [{topic.asText}]"
            Topic.crack topic |> sub.Subscribe
        member _.SubscribeAll () =
            log.Debug $"Bus ({nodeId.asText}) subscribe all"
            sub.SubscribeToAnyTopic ()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...