如何使用状态机通过 MassTransit 将消费者连接到 Kafka 主题

问题描述

当有两个服务时:

服务 1 托管状态机并生成有关主题的消息。服务 2 应该使用此消息。如何正确设置服务 2 以使用消息?

代码变成这样时,它不起作用:

services.AddMasstransit(mt =>
{
    mt.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context,SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.UsingKafka((ctx,kafka) =>
        {
            kafka.Host("kafka_url");
        });

        rider.AddConsumer<OrderConsumer>()
            .Endpoint(e =>
            {
                e.Name = "queue_name";
                e.Temporary = false;
                e.ConcurrentMessageLimit = 8;
            });
    });
});

当我这样做时,它抛出 System.ArgumentException: 'The consumer type was not found: OrderConsumer'

services.AddMasstransit(mt =>
{
    mt.UsingInMemory((context,kafka) =>
        {
            kafka.Host("kafka_url");

            kafka.TopicEndpoint<Null,OrderMessage>("queue_name","group_id",cfg =>
            {
                cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                cfg.ConfigureConsumer<OrderConsumer>(ctx);
            });
        });
    });
});

解决方法

您需要混合发布的两个样本:

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context,SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.AddConsumer<OrderConsumer>()
        
        rider.UsingKafka((ctx,kafka) =>
        {
            kafka.Host("kafka_url");

            kafka.TopicEndpoint<Null,OrderMessage>("queue_name","group_id",cfg =>
            {
                cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                cfg.ConfigureConsumer<OrderConsumer>(ctx);
            });
        });
    });
});