问题描述
我们的要求如下:-
- Exchange 1是一个主题交换,并且队列1已绑定到它。它在VHOST 1上。
- 应用程序已订阅队列1。它处理队列1的消息。在处理队列1消息后,我们要将下一条消息发布到VHOST 2(不同的兔子连接)上的其他交换机。
我有以下问题:-
a)是否可以在没有联盟的情况下实现?
b)在同一应用程序中,我可以维持2个不同的Rabbit连接吗?
我们正在使用EasynetQ作为客户端来连接Rabbitmq。
可以请您分享一些示例。
谢谢。
解决方法
a)是的,您还可以在虚拟主机之间创建一个铲子,这比联合会更简单
b)是的,只要您为每个总线实例使用不同的DI(子)容器,创建多个IBus实例就不会出现问题,因此会增加复杂性。
,这是我处理多个连接的方式。我无法直接从 EasyNetQ 找到解决方案。我不使用 MS DI 的默认 DI 适配器。而且我只使用高级 API 并手动注入我需要的服务。到目前为止,它似乎有效,但确实需要更多测试。
在startup.cs/ConfigureServices
services.AddBusStation(busStationBuilder =>
{
// inject IBusStation and get the bus thru name
appSettings.RabbitMQSettings.Connections.ForEach(c =>
{
var taskQueueBus = RabbitHutch.CreateBus(c.ConnectionString,CustomServiceRegister.ServiceRegisterAction());
c.Exchanges.ForEach(async e =>
{
await taskQueueBus.Advanced.ExchangeDeclareAsync(e.Name,e.Type,e.Durable,e.AutoDelete);
});
busStationBuilder.Add(c.Name,taskQueueBus.Advanced);
busStationBuilder.AddDefaultBus(taskQueueBus);
});
});
public interface IBusStation
{
IBus DefualtBus { get; }
IAdvancedBus Get(string busName);
void Add(string busName,IAdvancedBus advancedBus);
void Add(IBus bus);
}
public class BusStation : IBusStation
{
private Dictionary<string,IAdvancedBus> BusList { get; set; } = new Dictionary<string,IAdvancedBus>();
public IBus DefualtBus { get; private set; }
public IAdvancedBus Get(string busName)
{
if (BusList.TryGetValue(busName,out IAdvancedBus advancedBus))
{
return advancedBus;
}
return null;
}
public void Add(string busName,IAdvancedBus advancedBus)
{
BusList.Add(busName,advancedBus);
}
public void Add(IBus bus)
{
this.DefualtBus = bus;
}
}
public class BusStationBuilder
{
private readonly IBusStation _BusStation;
public BusStationBuilder(IServiceCollection services,IBusStation busStation)
{
this._BusStation = busStation;
services.AddSingleton(busStation);
}
public BusStationBuilder Add(string busName,IAdvancedBus advancedBus)
{
_BusStation.Add(busName,advancedBus);
return this;
}
public BusStationBuilder AddDefaultBus(IBus bus)
{
_BusStation.Add(bus);
return this;
}
}
public static class DependencyExtensions
{
public static IServiceCollection AddBusStation(this IServiceCollection services,Action<BusStationBuilder> builder)
{
var busStationBuilder = new BusStationBuilder(services,new BusStation());
builder(busStationBuilder);
return services;
}
}
appsettings.json
"RabbitMQSettings": {
"DefaultQueue": "task.main","Connections": [
{
"Name": "Task_Queue","ConnectionString": "host=192.168.123.123;virtualHost=/;username=admin;password=password123;prefetchCount=1;persistentMessages=true;publisherConfirms=true","Exchanges": [
{
"Name": "Direct_Task_Queue","Type": "direct","Passive": false,"Durable": true,"AutoDelete": false,"Internal": false,"AlternateExchange": null,"Delayed": false
}
]
}
]
},