如何在 AMQP 1.0 和 RabbitMQ 中使用主题和队列实现发布/订阅者?

问题描述

我认为的简单问题:

我想要一个带有路由键“red.dog”的主题,并且我想要有 3 个队列: 1 个获取所有的持久队列(路由键应该是 #,对吗?) 1 个非持久队列获取所有 1 个只有红色的非持久队列(路由键应该是红色的。*,对吗?)

所以我使用 C#、最新的 RabbitMQ 和 AMQP 1.0 以及 amqpnetlite 库。

我的发件人是这样的:

        string address = "amqp://localhost:5672";
        Connection connection = new Connection(new Address(address),SaslProfile.Anonymous,new open() { ContainerId = DEFAULT_CONTAINER_ID },null);
        Session session = new Session(connection);
        SenderLink sender = new SenderLink(session,"test-sender","/topic/red.dogs"); // TOPIC: red.dogs 
        while (true)
        {
            try
            {
                Message message1 = new Message("Hello AMQP!");
                sender.Send(message1);
                Thread.Sleep(5000);
                this._logger.Loginformation("Send!");
            }
            catch (Exception ex)
            {
                this._logger.LogError("ERROR: " + ex.Message);
            }
        }

效果很好。 我可以在 amqp.topics 中看到消息,当我通过网络界面创建一个队列并将其绑定到“#”时,它会正确获取所有消息。到目前为止一切顺利。

现在当我尝试订阅时没有消息:

       string address = "amqp://localhost:5672";
        Connection connection = new Connection(new Address(address),null);
        Session session = new Session(connection);
        Source source = new Source();
        source.Address = "/queue/#";
        source.Durable = 0;
        ReceiverLink rec = new ReceiverLink(session,"test-sub",source,null);
        while (true)
        {
            try
            {
                Message msg = rec.Receive(TimeSpan.FromSeconds(2));
                if (msg == null)
                {
                    this._logger.Loginformation("nothing recieved");
                }
                else
                {
                    this._logger.Loginformation("Recieved: " + msg.Body);
                }
            }
            catch (Exception ex)
            {
                this._logger.Loginformation("ERROR: " + ex.Message);
            }
            
        }

所以我看到在名为“#”的 Web 界面中创建的队列但没有绑定,它是非持久的“持久 = 0”和持久的“持久 = 1”,所以这似乎有效。 我找不到正确的地址命名来运行我想要的 3 个示例(尝试了很多)。

我找到了有关 AMQP 1.0 集成的文档: https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/amqp10_client

所以我的 /queue/# 应该是正确的我认为..

我做错了什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)