mangos的问题——Golang包提供的nanomsg总线协议

问题描述

我想用nanomsg/nng作为完全分布式对等多节点网络的通信基础,帮助构建拓扑发现和维护的动态能力。现在我陷入了它的 Golang 包 mangos

在python和pynng(nanomsg的python绑定)中做了同样的工作,但是当我使用go并通过mangos调用相应的方法时,它们的行为完全不同。谜题主要有三方面:

  1. bus-type-Socket 的 Recv() 认以阻塞模式运行,似乎无法配置为非阻塞模式。文件说:

OptionRecvDeadline 是下一次 Recv 超时之前的时间。该值是一个时间。持续时间。可以传递零值以指示不应应用超时。负值表示非阻塞操作。认情况下没有超时。

我相应地尝试了负值,但 Recv() 仍然阻塞。我还应该做什么?以及如何理解“零超时”和“非阻塞”的区别?

  1. dialer 返回的 (s *socket) NewDialer(...) 似乎在调用 dialer.Close() 后仍然存在,因为调用一个 dialer.Dial() 时会发生错误,报告它仍然是“正在使用的地址” .但是当我再次尝试 Close() dialer 时,发生错误并报告它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true                    // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond  // or zero 
opts[mangos.OptionKeepAliveTime] = time.Millisecond     // or even smaller
opts[mangos.OptionKeepAlive] = false                    // or true

如果我想彻底杀死拨号器,或者想在一段时间后重新使用“伪封闭”拨号器,我该怎么办?

  1. bus-type-Socket 的 Send() 很奇怪。通常每个节点都应该定期在我的代码中发送一条消息。我从网络中关闭一个节点(比如“Node-X”)的物理连接,让它离线一段时间,然后重新连接到网络。我发现 Node-X 在重新连接时会立即重新发送大量消息。但我真正期望的是,即使 Node-X 没有邻居,它也可以将这些消息发送到空中。

不知道有没有什么办法可以解决这些问题。我想它可能缺少一些选项或配置,但我没能弄明白。

以下代码用于重现重拨和重合错误

package main

import (
    "fmt"
    "os"
    "time"

    "go.nanomsg.org/mangos/v3"
    "go.nanomsg.org/mangos/v3/protocol/bus"

    // register transports
    _ "go.nanomsg.org/mangos/v3/transport/all"
)

var (
    sock      mangos.socket
    DialerMap map[string]*mangos.Dialer
    opts      map[string]interface{}
)

func main() {
    var err error
    opts = make(map[string]interface{})
    opts[mangos.OptionDialAsynch] = true
    opts[mangos.OptionMaxReconnectTime] = time.Millisecond
    // opts[mangos.OptionKeepAliveTime] = time.Millisecond
    opts[mangos.OptionKeepAlive] = false
    DialerMap = make(map[string]*mangos.Dialer)

    if sock,err = bus.NewSocket(); err != nil {
        fmt.Println("bus.NewSocket error. ",err)
        os.Exit(1)
    }
    TargetUUID := "node-A"
    TargetAddr := "tcp://192.168.0.172:60000"   // this should be changed to a available address
    MyDial(TargetUUID,TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID,TargetAddr)
    time.Sleep(time.Second * 2)
    MyDial(TargetUUID,TargetAddr)
    time.Sleep(100 * time.Second)

}
func MyDial(TargetUUID string,TargetAddr string) (mangos.Dialer,error) {
    _,is_exist := DialerMap[TargetUUID]
    var err error
    var dialer mangos.Dialer
    if !is_exist {
        dialer,err = sock.NewDialer(TargetAddr,opts)
        if err != nil {
        } else {
            DialerMap[TargetUUID] = &dialer
        }
    }
    dialer = *DialerMap[TargetUUID]
    err = dialer.Dial()
    if err != nil {
        fmt.Println("Dialer fails to dial()",err)
    } else {
        fmt.Println("Dialer succeeds to dial()")
    }

    return dialer,err
}

func MyClose(TargetUUID string,TargetAddr string) {
    dialerAddr,is_exist := DialerMap[TargetUUID]
    if !is_exist {
        fmt.Println("Dialer does not exist")
    }
    dialer := *dialerAddr
    err := dialer.Close()

    if err != nil {
        fmt.Println("dialer fails to close.",err)
    } else {
        fmt.Println("dialer succeeds to close")
    }

}

和控制台输出

Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed

解决方法

对于此类问题,我通常不会监控 stackoverflow 或 reddit——我们确实有一个不和谐频道(来自 mangos 和 NNG 主页的链接)以及一个邮件列表。

话虽如此,让我看看能不能帮上忙(我是 NNG 和 mangos 的作者):

  1. 总线支持OptionRecvDeadline。但是,您是正确的,它不支持具有负值的非阻塞模式,相反,负值被视为零,并充当阻塞。这是一个文档错误。要实现逻辑非阻塞,请使用值“1”,这意味着一纳秒,这在逻辑上等同于非阻塞,尽管粒度可能受到调度程序延迟的限制。 (在这种情况下,它就像执行“go close(channel);

我会看看如何修复文档。

  1. 在拨号器上调用 Close() 是正确的做法。它会一直存在,直到管道关闭,它会自动关闭。您使用非常的重拨时间可能会混淆这一点——老实说,我没有考虑过微小的重拨时间——通常这样做是不好的形式,因为它意味着如果对等点不可用,您的代码将在试图重新连接的处理器上剧烈旋转。我通常建议至少 10 毫秒的重试间隔上限。 (mangos.OptionMaxReconnectTime)

  2. 我认为您看到了排队的效果,但我不能 100% 确定 - 我需要查看一个测试用例来重现这一点。总线协议肯定是尽力而为的传递,如果没有连接的对等点,则消息会被丢弃在地板上。 (只是重新检查了一下。)

,

感谢@Garrett D'Amore 的回复,我现在可以用另一种方式解决我的问题,而且我(作为一个对底层通信层知之甚少的新 Golang 粉丝)很抱歉用这样一个基本而愚蠢的问题来打扰您。

问题(1)作者很好地回答了。

问题(3)可能与问题(2)相结合,因为作者将机制描述如下,从而消除了发送缓冲累积的可能性。

当然,总线协议是尽力而为的传递,如果没有连接的对等点,则消息会被丢弃在地板上。 (只是重新检查了一下。)

问题(2),我第一次尝试将mangos.OptionMaxReconnectTime设置为100 ms,但问题依然存在。第二次,我尝试了各种options组合来配置socket和拨号器,但都失败了。

最后,既然作者指出

在拨号器上调用 Close() 是正确的做法。它会一直存在,直到管道关闭,它会自动关闭。您使用非常短的重拨时间可能会混淆这一点。

我转向另一种关闭旧拨号器的方法,即明确关闭它拥有的所有管道。为了实现这一点,可以定义一个回调处理程序,如

var pipe_c chan
func callbackHandler(event mangos.PipeEvent,pipe mangos.Pipe) {
    pAddr := &pipe
    pipe_c <- pAddr
}

然后将 callbackHandler 附加到套接字

sock.SetPipeEventHook(callbackHandler)

通过这样做,用户可以获得(私有变量)管道。当一个人想关闭拨号连接时,他或她可以这样做

dialer.Close()                    // try best to close a dialer automatically
for pAddr,num := range pipeSet {
    (*pAddr).Close()              // explicitly close all the pipes of the dialer
}

然后将“伪封闭”拨号器放在一边。当您想再次连接到远程地址时,可以创建并使用一个新的拨号程序。

我不知道旧的“伪封闭”拨号器是否会累积在内存中。但这已经是我能找到的唯一解决方案了。