同一项目内的多个虚拟主机连接订阅/发布

问题描述

我们的要求如下:-

  1. Exchange 1是一个主题交换,并且队列1已绑定到它。它在VHOST 1上。
  2. 应用程序已订阅队列1。它处理队列1的消息。在处理队列1消息后,我们要将下一条消息发布到VHOST 2(不同的兔子连接)上的其他交换机。

我有以下问题:-
a)是否可以在没有联盟的情况下实现?
b)在同一应用程序中,我可以维持2个不同的Rabbit连接吗?

我们正在使用EasynetQ作为客户端来连接Rabbitmq。

可以请您分享一些示例。

谢谢。

解决方法

a)是的,您还可以在虚拟主机之间创建一个铲子,这比联合会更简单

b)是的,只要您为每个总线实例使用不同的DI(子)容器,创建多个IBus实例就不会出现问题,因此会增加复杂性。

,

这是我处理多个连接的方式。我无法直接从 EasyNetQ 找到解决方案。我不使用 MS DI 的默认 DI 适配器。而且我只使用高级 API 并手动注入我需要的服务。到目前为止,它似乎有效,但确实需要更多测试。

在startup.cs/ConfigureServices

            services.AddBusStation(busStationBuilder =>
            {
                // inject IBusStation and get the bus thru name
                appSettings.RabbitMQSettings.Connections.ForEach(c =>
                {
                    var taskQueueBus = RabbitHutch.CreateBus(c.ConnectionString,CustomServiceRegister.ServiceRegisterAction());
                    c.Exchanges.ForEach(async e =>
                    {
                        await taskQueueBus.Advanced.ExchangeDeclareAsync(e.Name,e.Type,e.Durable,e.AutoDelete);
                    });
                    busStationBuilder.Add(c.Name,taskQueueBus.Advanced);
                    busStationBuilder.AddDefaultBus(taskQueueBus);
                });
            });
    public interface IBusStation
    {
        IBus DefualtBus { get; }
        IAdvancedBus Get(string busName);
        void Add(string busName,IAdvancedBus advancedBus);
        void Add(IBus bus);
    }

    public class BusStation : IBusStation
    {
        private Dictionary<string,IAdvancedBus> BusList { get; set; } = new Dictionary<string,IAdvancedBus>();
        public IBus DefualtBus { get; private set; }
        public IAdvancedBus Get(string busName)
        {
            if (BusList.TryGetValue(busName,out IAdvancedBus advancedBus))
            {
                return advancedBus;
            }
            return null;
        }
        public void Add(string busName,IAdvancedBus advancedBus)
        {
            BusList.Add(busName,advancedBus);
        }
        public void Add(IBus bus)
        {
            this.DefualtBus = bus;
        }
    }

    public class BusStationBuilder
    {
        private readonly IBusStation _BusStation;

        public BusStationBuilder(IServiceCollection services,IBusStation busStation)
        {
            this._BusStation = busStation;
            services.AddSingleton(busStation);
        }
        public BusStationBuilder Add(string busName,IAdvancedBus advancedBus)
        {
            _BusStation.Add(busName,advancedBus);
            return this;
        }
        public BusStationBuilder AddDefaultBus(IBus bus)
        {
            _BusStation.Add(bus);
            return this;
        }
    }

    public static class DependencyExtensions
    {
        public static IServiceCollection AddBusStation(this IServiceCollection services,Action<BusStationBuilder> builder)
        {
            var busStationBuilder = new BusStationBuilder(services,new BusStation());
            builder(busStationBuilder);
            return services;
        }
    }

appsettings.json


    "RabbitMQSettings": {
        "DefaultQueue": "task.main","Connections": [
            {
                "Name": "Task_Queue","ConnectionString": "host=192.168.123.123;virtualHost=/;username=admin;password=password123;prefetchCount=1;persistentMessages=true;publisherConfirms=true","Exchanges": [
                    {
                        "Name": "Direct_Task_Queue","Type": "direct","Passive": false,"Durable": true,"AutoDelete": false,"Internal": false,"AlternateExchange": null,"Delayed": false
                    }
                ]
            }
        ]
    },