你如何连接到 AMQP 1.0 主题而不是在 Golang 中排队

问题描述

我一直在尝试 go-amp package README 上的示例代码,但我想连接到一个主题,而不是截至今天该 README 上的示例代码中所示的队列。

我所做的只是将主题名称放在“队列名称”这样的位置。

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/Azure/go-amqp"
)

const host = "example.com"
const topic = "/topic/my_topic"
const port = "5672"
const username = "my_username"
const password = "my_password"

// A hleper function to handle errors
func failOnError(err error,msg string){
    if err != nil {
        log.Fatalf("%s %s",msg,err)
    }
}


func main(){
    // connect to remote amqp server
    host_address := fmt.Sprintf("amqps://%s:%s",host,port)
    log.Println("Connecting to ",host_address)

    client,err := amqp.Dial(host_address,amqp.ConnSASLPlain(username,password),)
    failOnError(err,"Failed to connect to Server")
    defer client.Close()

    // Open a session
    session,err := client.NewSession()
    failOnError(err,"Failed to create AMQP session")

    ctx := context.Background()

        // Continuously read messages
        {
            // Create a receiver
            receiver,err := session.NewReceiver(
                amqp.LinkSourceAddress(topic),amqp.LinkCredit(10),)
            failOnError(err,"Failed creating receiver link")
            defer func() {
                ctx,cancel := context.WithTimeout(ctx,1*time.Second)
                receiver.Close(ctx)
                cancel()
            }()

            log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
            for {
                // Receive next message
                msg,err := receiver.Receive(ctx)
                failOnError(err,"Failed reading message from AMQP:")
    
                // Accept message
                msg.Accept(context.Background())    
                fmt.Printf("Message received: Body: %s\n",msg.Value)
            }
        }
}

我一直收到这个错误

Failed creating receiver link *Error{Condition: amqp:unauthorized-access,Description: User my_username is not authorized to read from: queue:///topic/my_topic,Info: map[]}

它似乎将我的主题视为队列。如何设置接收器以尝试附加到主题而不是队列?

编辑
我正在使用使用 AMQP 1.0 的 ActiveMQ 代理。它由其他人管理,所以我只需要使用 AMQP 1.0。 这意味着我不能使用更流行的 go amqp 包,因为它不支持 AMQP 1.0。感谢 Tim Bish 提醒我添加内容

解决方法

经过反复试验,我认为可以将主题更改为“topic://my_topic”

const topic = "topic://my_topic"

其余代码保持不变。我创建了一个 gist,用于发送和接收主题。

我希望这能帮助像我这样的新手在键盘上敲打头。