问题描述
当有两个服务时:
服务 1 托管状态机并生成有关主题的消息。服务 2 应该使用此消息。如何正确设置服务 2 以使用消息?
当代码变成这样时,它不起作用:
services.AddMasstransit(mt =>
{
mt.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context,SnakeCaseEndpointNameFormatter.Instance));
mt.AddRider(rider =>
{
rider.UsingKafka((ctx,kafka) =>
{
kafka.Host("kafka_url");
});
rider.AddConsumer<OrderConsumer>()
.Endpoint(e =>
{
e.Name = "queue_name";
e.Temporary = false;
e.ConcurrentMessageLimit = 8;
});
});
});
当我这样做时,它抛出 System.ArgumentException: 'The consumer type was not found: OrderConsumer'
services.AddMasstransit(mt =>
{
mt.UsingInMemory((context,kafka) =>
{
kafka.Host("kafka_url");
kafka.TopicEndpoint<Null,OrderMessage>("queue_name","group_id",cfg =>
{
cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
cfg.ConfigureConsumer<OrderConsumer>(ctx);
});
});
});
});
解决方法
您需要混合发布的两个样本:
services.AddMassTransit(mt =>
{
mt.UsingInMemory((context,cfg) => cfg.ConfigureEndpoints(context,SnakeCaseEndpointNameFormatter.Instance));
mt.AddRider(rider =>
{
rider.AddConsumer<OrderConsumer>()
rider.UsingKafka((ctx,kafka) =>
{
kafka.Host("kafka_url");
kafka.TopicEndpoint<Null,OrderMessage>("queue_name","group_id",cfg =>
{
cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
cfg.ConfigureConsumer<OrderConsumer>(ctx);
});
});
});
});