实施.net核心工作者服务协调器

问题描述

我有一个.net核心控制台应用程序,当前有1个IHostedService,我需要在应用程序启动时产生并管理此服务的多个实例。

我希望实现某种协调器,负责启动,监视和重新启动工作人员。我知道这个概念,但是我不确定该如何实施。

工作人员自己将在实例化时通过ICallProgService打开一个到专有数据库的会话并进行维护。然后,工作人员将使用Masstransit库通过RabbitMq在请求/响应模式下侦听消息队列。

这里的细微差别是,我相信(不是100%肯定)他们都需要连接到同一个IBusControl才能进行大众运输

工人的代码在这里

public class MessageQueueWorker : BackgroundService
{
    private readonly ICallProgService _callProgService;
    private readonly IBusControl _bus;

    public MessageQueueWorker(ICallProgService callProgService)
    {
        _callProgService = callProgService;
        _bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost/"),h => { });

            cfg.ReceiveEndpoint("callprog-service",e =>
            {
                e.Handler<ICallProgRequest>(context =>
                {
                    Console.WriteLine($"Calling Program: {context.Message.ProgramName}");
                    return context.RespondAsync<ICallProgResponse>(new
                    {
                        Result = _callProgService.CallProgProgramAsync(
                            new CallProgRequest
                            {
                                ProgramName = context.Message.ProgramName,ProgramParams = context.Message.ProgramParams
                            })
                    });
                });
            });
        });
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return _bus.StartAsync(stoppingToken);
    }

    public override Task StopAsync(CancellationToken cancellationToken)
    {
        return Task.WhenAll(base.StopAsync(cancellationToken),_bus.StopAsync(cancellationToken));
    }
}

程序启动

static async Task Main(string[] args)
    {
        var config = new ConfigurationBuilder()
            .SetBasePath(System.IO.Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings2.json",optional: true,reloadOnChange: true)
            .Build();

        var builder = new HostBuilder()
            .ConfigureServices((hostcontext,services) =>
            {
                services.Configure<QmClientOptions>(config);
                services.AddSingleton<IQmClient,QmClient>();
                services.AddSingleton<ICallProgService,QmCallProgService>()
                .AddHostedService<MessageQueueWorker>();
            });

        await builder.runconsoleAsync();
    }

所需解决方案的最佳实施方法是什么?

解决方法

由于您要使用Microsoft的依赖注入和通用主机,因此我强烈建议您遵循ASP.NET Core documentation,并让MassTransit设置和管理总线。这还将为您提供健康监控,以确保服务正常运行(当然,健康监控是可选的)。

我还建议使用real consumer,并使用ConfigureConsumer(context)对其进行配置,以便您可以为每个消息利用容器作用域。除了简单的测试外,不鼓励使用处理程序,因为它们不会创建容器范围。它们工作正常,但鼓励分离出消费者功能。