问题描述
我正在查看 Masstransit,我使用 Masstransit dotnet 模板生成了一个工作线程,按照 https://masstransit-project.com/getting-started/(直到 RabbitMQ 的所有内容)
然后我对让内置中介处理响应很感兴趣,因此根据 https://masstransit-project.com/articles/mediator.html
更改了代码所以它的设置看起来像......
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext,services) =>
{
services.AddMediator(x =>
{
x.AddConsumer<MessageConsumer>();
x.AddRequestClient<Message>();
});
services.AddMasstransit(x =>
{
x.AddConsumersFromNamespaceContaining<MessageConsumer>();
x.UsingInMemory((context,cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMasstransitHostedService(true);
services.AddHostedService<Worker>();
});
消费者/合约现在看起来像
public class Message { public string Text { get; set; } }
public class MessageResult { public string Text { get; set; } }
public class MessageConsumer : IConsumer<Message>
{
readonly ILogger<MessageConsumer> _logger;
public MessageConsumer(ILogger<MessageConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<Message> context)
{
_logger.Loginformation("Received Text: {Text}",context.Message.Text);
return context.RespondAsync(new MessageResult() {Text = $"Got {context.Message.Text}"});
}
}
工人看起来像
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IBus _bus;
private readonly IRequestClient<Message> _request;
public Worker(ILogger<Worker> logger,IBus bus,IRequestClient<Message> request)
{
_logger = logger;
_bus = bus;
_request = request;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var response = await _request.GetResponse<MessageResult>(new Message {Text = $"The time is {DateTimeOffset.Now}"},stoppingToken);
await Task.Delay(1000,stoppingToken);
}
}
}
但是,当我运行它时,IRequestClient 的注入似乎失败了(由于某种原因它没有注册?),但有异常
Error while validating the service descriptor 'ServiceType: Microsoft.Extensions.Hosting.IHostedService Lifetime: Singleton ImplementationType: MTGettingStarted.Worker': Cannot consume scoped service 'Masstransit.IRequestClient`1[MTGettingStarted.Message]' from singleton 'Microsoft.Extensions.Hosting.IHostedService'.
我认为 x.AddRequestClient<Message>();
应该这样做。也许文档不完整?还是我错过了什么?
如果我将工作人员更改为手动获取请求,那么它确实有效
var client = _mediator.CreateRequestClient<Message>();
var response = await client.GetResponse<MessageResult>(new Message {Text = $"The time is {DateTimeOffset.Now}"},stoppingToken);
但仍然好奇为什么 DI 不起作用?
解决方法
IRequestClient<T>
在容器中注册为作用域,您发布的错误表明:
Cannot consume scoped service
您的托管服务是单例。