问题描述
我在“工作队列”场景中使用Rabbitmq。
我需要例如。一个由5个使用者组成的池(每个使用者都有自己的通道),因此一个执行I / O操作的使用者不会阻止同一队列中的其他使用者。
例如。 如果我要排队: 消息1,消息2,消息3,消息4。(FistConsumerHandler)的每个实例将使用Round Robin(默认Rabbitmq行为)从队列中提取1条消息。
我面临的问题是我需要使用依赖注入来做到这一点。
这是我到目前为止所拥有的:
在Windows服务上启动(我的使用者托管在Windows服务中):
protected override void OnStart(string[] args)
{
BuildConnections();
// Register the consumers. For simplicity only showing FirstConsumerHandler.
AddConsumerHandlers<FistConsumerHandler>(ConstantesProcesos.Exchange,ConstantesProcesos.QueueForFirstHandler);
BuildStartup();
var logger = GetLogger<ServicioProcesos>();
logger.Loginformation("Windows Service Started");
Console.WriteLine("Press [enter] to exit.");
}
protected virtual void BuildConnections(
string notificationHubPath = "notificationhub_path",string rabbitMQHostname = "rabbitmq_hostname",string rabbitMQPort = "rabbitmq_port",string rabbitMQUserName = "rabbitmq_username",string rabbitMQPassword = "rabbitmq_password")
{
ContextHelpers.Setup(ConfigurationManager.ConnectionStrings[appContextConnectionString].ConnectionString);
if (_connection == null)
{
var factory = new ConnectionFactory
{
HostName = ConfigurationManager.AppSettings[rabbitMQHostname],Port = int.Parse(ConfigurationManager.AppSettings[rabbitMQPort]),UserName = ConfigurationManager.AppSettings[rabbitMQUserName],Password = ConfigurationManager.AppSettings[rabbitMQPassword],dispatchConsumersAsync = true,};
// Create a connection
do
{
try
{
_connection = factory.CreateConnection();
}
catch (RabbitMQ.Client.Exceptions.brokerUnreachableException e)
{
Thread.Sleep(5000);
}
} while (_connection == null);
}
_startupBuilder = new StartupBuilder(_connection);
}
protected void AddConsumerHandlers<THandler>(string exchange,string queue)
{
var consumerHandlerItem = new ConsumerHandlerItem
{
ConsumerType = typeof(THandler),Exchange = exchange,Queue = queue
};
_startupBuilder._consumerHandlerItems.Add(consumerHandlerItem);
}
protected void BuildStartup()
{
ServiceProvider = _startupBuilder.Build();
}
Startup Builder:
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
public class StartupBuilder
{
private static IConnection _connection;
private IModel _channel;
public List<ConsumerHandlerItem> _consumerHandlerItems;
public IServiceCollection Services { get; private set; }
public StartupBuilder(IConnection connection)
{
_connection = connection;
_consumerHandlerItems = new List<ConsumerHandlerItem>();
Services = new ServiceCollection();
}
public IServiceProvider Build()
{
_channel = _connection.CreateModel();
Services.InitSerilog();
// Add channel as singleton (this is not correct as I need 1 channel per ConsumerHandler)
Services.AddSingleton(_channel);
// Register the ConsumerHandler to DI
foreach (var item in _consumerHandlerItems)
{
// Add FirstHandler to DI
Type consumerType = item.ConsumerType;
Services.AddSingleton(consumerType);
}
// Finish DI Setup
var serviceProvider = Services.BuildServiceProvider();
// Bind the consumer handler to the channel and queue
foreach (var item in _consumerHandlerItems)
{
var consumerHandler = (AsyncEventingBasicConsumer)serviceProvider.GetrequiredService(item.ConsumerType);
_channel.AssignNewProcessor(item,consumerHandler);
}
return serviceProvider;
}
}
助手:
public static class QueuesHelpers
{
public static void AssignNewProcessor(this IModel channel,ConsumerHandlerItem item,AsyncEventingBasicConsumer consumerHandler)
{
channel.ExchangeDeclare(item.Exchange,ExchangeType.Topic,durable: true);
channel.QueueDeclare(item.Queue,true,false,null);
channel.QueueBind(item.Queue,item.Exchange,item.Queue,null);
channel.BasicConsume(item.Queue,consumerHandler);
}
}
消费者处理程序:
public class FistConsumerHandler : AsyncEventingBasicConsumer
{
private readonly ILogger<FistConsumerHandler> _logger;
private Guid guid = Guid.NewGuid();
public FistConsumerHandler(
IModel channel,ILogger<FistConsumerHandler> logger) : base(channel)
{
Received += ConsumeMessageAsync;
_logger = logger;
}
private async Task ConsumeMessageAsync(object sender,BasicDeliverEventArgs eventArgs)
{
try
{
// consumer logic to consume the message
}
catch (Exception ex)
{
}
finally
{
Model.AckNowledge(eventArgs);
}
}
}
此代码的问题是:
- 只有1个FistConsumerHandler实例(已注册为单例)。我需要例如5。
- 我只有1个频道,每个实例需要1个频道。
总而言之,使用Microsoft.Extensions.DependencyInjection的预期行为应为:
解决方法
TL;DR;创建您自己的范围
我在我正在开发的应用程序中做了类似的事情,尽管没有我想要的那么干净(这也是我看到这篇文章的原因)。对我来说,关键是使用 IServiceScopeFactory
获取注入的服务并在消费者方法中使用它们。在典型的 HTTP 请求中,API 将分别在请求传入/响应传出时自动为您创建/关闭范围。但由于这不是 HTTP 请求,我们需要创建/关闭使用注入服务的范围。
这是获取注入的数据库上下文的简化示例(但可以是任何内容),假设我已经设置了 RabbitMQ 使用者,将消息反序列化为一个对象(在此示例中为 FooEntity
):
public class RabbitMQConsumer
{
private readonly IServiceProvider _provider;
public RabbitMQConsumer(IServiceProvider serviceProvider)
{
this._serviceProvider = serviceProvider;
}
public async Task ConsumeMessageAsync()
{
// Using statement ensures we close scope when finished,helping avoid memory leaks
using (var scope = this._serviceProvider.CreateScope())
{
// Get your service(s) within the scope
var context = scope.ServiceProvider.GetRequiredService<MyDBContext>();
// Do things with dbContext
}
}
}
务必将 RabbitMQConsumer
注册为单例,而不是 Startup.cs
中的瞬态。