Kaska Producer with MassTransit-IBusInstance未注册

问题描述

我正在尝试使用MassTransit

建立一个Kafka用户

我有这段代码

var services = new ServiceCollection();
services.AddMasstransit(x =>
{   
    x.AddRider(rider =>
    {
        rider.AddProducer<string,Request>("request",m => m.Message.RequestId);
        
        rider.UsingKafka((context,k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

var provider = services.BuildServiceProvider();
var producer = provider.GetrequiredService<ITopicProducer<Request>>();

await producer.Produce(new Request()
{
    RequestId = "abc123",RequestedAt = DateTime.UtcNow
});

这是here

中制作人的最简单示例

但是当我尝试运行它时,出现此异常

Unhandled exception. system.invalidOperationException: No service for type 'Masstransit.Registration.IBusInstance' has been registered.

查看他们网站上的示例,我发现这可能与我尚未注册RabbitMQ的事实有关

x.UsingRabbitMq((context,cfg) => cfg.ConfigureEndpoints(context));

但是我没有RabbitMQ,在这种情况下我只使用Kafka。

是否有必要向其他消息代理注册总线以生成到Kafka?

解决方法

从文档中:

MassTransit v7引入的

车手提供了一种将消息从任何来源传递到总线的新方法。车手与总线一起配置,并在启动时登上总线。

要添加骑手,必须有一个巴士实例。如果您不需要具有持久性传输的总线(例如RabbitMQ),则可以使用内存中的传输。

var services = new ServiceCollection();
services.AddMassTransit(x =>
{  
    x.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context));

    x.AddRider(rider =>
    {
        rider.AddProducer<string,Request>("request",m => m.Message.RequestId);
        
        rider.UsingKafka((context,k) =>
        {
            k.Host("localhost:9092");
        });
    });
});

公交需要启动和停止,这也将启动/停止公交上的所有骑手。您可以通过IBusControl进行此操作:

var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();

await busControl.StartAsync(cancellationToken);

或者如果您使用的是ASP.NET Core通用主机,则通过添加MassTransit托管服务。

services.AddMassTransitHostedService(); // in MassTransit.AspNetCore