问题描述
我有一个.net核心控制台应用程序,当前有1个IHostedService,我需要在应用程序启动时产生并管理此服务的多个实例。
我希望实现某种协调器,负责启动,监视和重新启动工作人员。我知道这个概念,但是我不确定该如何实施。
工作人员自己将在实例化时通过ICallProgService
打开一个到专有数据库的会话并进行维护。然后,工作人员将使用Masstransit库通过RabbitMq在请求/响应模式下侦听消息队列。
这里的细微差别是,我相信(不是100%肯定)他们都需要连接到同一个IBusControl
才能进行大众运输
public class MessageQueueWorker : BackgroundService
{
private readonly ICallProgService _callProgService;
private readonly IBusControl _bus;
public MessageQueueWorker(ICallProgService callProgService)
{
_callProgService = callProgService;
_bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri("rabbitmq://localhost/"),h => { });
cfg.ReceiveEndpoint("callprog-service",e =>
{
e.Handler<ICallProgRequest>(context =>
{
Console.WriteLine($"Calling Program: {context.Message.ProgramName}");
return context.RespondAsync<ICallProgResponse>(new
{
Result = _callProgService.CallProgProgramAsync(
new CallProgRequest
{
ProgramName = context.Message.ProgramName,ProgramParams = context.Message.ProgramParams
})
});
});
});
});
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return _bus.StartAsync(stoppingToken);
}
public override Task StopAsync(CancellationToken cancellationToken)
{
return Task.WhenAll(base.StopAsync(cancellationToken),_bus.StopAsync(cancellationToken));
}
}
程序启动
static async Task Main(string[] args)
{
var config = new ConfigurationBuilder()
.SetBasePath(System.IO.Directory.GetCurrentDirectory())
.AddJsonFile("appsettings2.json",optional: true,reloadOnChange: true)
.Build();
var builder = new HostBuilder()
.ConfigureServices((hostcontext,services) =>
{
services.Configure<QmClientOptions>(config);
services.AddSingleton<IQmClient,QmClient>();
services.AddSingleton<ICallProgService,QmCallProgService>()
.AddHostedService<MessageQueueWorker>();
});
await builder.runconsoleAsync();
}
解决方法
由于您要使用Microsoft的依赖注入和通用主机,因此我强烈建议您遵循ASP.NET Core documentation,并让MassTransit设置和管理总线。这还将为您提供健康监控,以确保服务正常运行(当然,健康监控是可选的)。
我还建议使用real consumer,并使用ConfigureConsumer(context)
对其进行配置,以便您可以为每个消息利用容器作用域。除了简单的测试外,不鼓励使用处理程序,因为它们不会创建容器范围。它们工作正常,但鼓励分离出消费者功能。