问题描述
我正在尝试使用MassTransit
建立一个Kafka用户var services = new ServiceCollection();
services.AddMasstransit(x =>
{
x.AddRider(rider =>
{
rider.AddProducer<string,Request>("request",m => m.Message.RequestId);
rider.UsingKafka((context,k) =>
{
k.Host("localhost:9092");
});
});
});
var provider = services.BuildServiceProvider();
var producer = provider.GetrequiredService<ITopicProducer<Request>>();
await producer.Produce(new Request()
{
RequestId = "abc123",RequestedAt = DateTime.UtcNow
});
这是here
中制作人的最简单示例但是当我尝试运行它时,出现此异常
Unhandled exception. system.invalidOperationException: No service for type 'Masstransit.Registration.IBusInstance' has been registered.
查看他们网站上的示例,我发现这可能与我尚未注册RabbitMQ的事实有关
x.UsingRabbitMq((context,cfg) => cfg.ConfigureEndpoints(context));
但是我没有RabbitMQ,在这种情况下我只使用Kafka。
解决方法
从文档中:
MassTransit v7引入的车手提供了一种将消息从任何来源传递到总线的新方法。车手与总线一起配置,并在启动时登上总线。
要添加骑手,必须有一个巴士实例。如果您不需要具有持久性传输的总线(例如RabbitMQ),则可以使用内存中的传输。
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddProducer<string,Request>("request",m => m.Message.RequestId);
rider.UsingKafka((context,k) =>
{
k.Host("localhost:9092");
});
});
});
公交需要启动和停止,这也将启动/停止公交上的所有骑手。您可以通过IBusControl
进行此操作:
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(cancellationToken);
或者如果您使用的是ASP.NET Core通用主机,则通过添加MassTransit托管服务。
services.AddMassTransitHostedService(); // in MassTransit.AspNetCore