问题描述
作为消费者,我一直在为我的项目使用 Masstransit。 Masstransit 服务将使用来自另一个服务的消息,我构建的另一个服务当前在 nestJS 上运行并使用 RabbitMq 发送消息。当我尝试序列化包含如下基本属性的 Rabbitmq 消息时出现的问题:
但由于 Masstransit 将使用接口包装我的消息,因此我无法获取存储在属性中的值。它是用于回复 RabbitMq 消息的“回复”地址。有什么办法可以获得那个价值吗?或者我如何应用“直接回复”功能响应rabbitmq消息?
更新 1
作为 Chris Patterson 的回应,我尝试做这样的事情:
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddControllersWithViews();
services.AddMasstransit(x =>
{
x.AddConsumer<NewUserConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.UseHealthCheck(provider);
cfg.Host(new Uri("rabbitmq://localhost"),h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("profile_queue",ep =>
{
ep.PrefetchCount = 16;
//ep.UseMessageRetry(r => r.Interval(2,100));
ep.ClearMessageDeserializers();
ep.UseRawJsonSerializer();
ep.ConfigureConsumer<NewUserConsumer>(provider);
});
}));
});
services.AddMasstransitHostedService();
}
NewUserConsumer.cs
public class NewUserConsumer : IConsumer<MsgContext>
{
public async Task Consume(ConsumeContext<MsgContext> context)
{
var data = context.Message.data;
var pattern = context.Message.pattern;
var id = context.Message.id;
Console.WriteLine("======> [Message Received]");
Console.WriteLine($"data:{data}");
Console.WriteLine($"pattern:{pattern}");
await context.RespondAsync<MsgContext>(new
{
data = $"Received : {id}",pattern = "add-profile"
});
}
}
但我收到了这条消息:
fail: Masstransit.ReceiveTransport[0]
S-FAULT
rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext
07f80000-5d0a-0015-c6d0-08d8b1ee0c86Altsrc.MasstransitSample.EventContracts.MsgContext
我注意到当我收到此错误时,rabbitmq 管理显示 Ready: 0,Unacked: 1,Total: 1 但一旦终端关闭,它将显示 Ready: 1,Unacked: 0,Total: 1。不知何故我的配置不正确或者我错过了什么?。
更新 2
我尝试使用 ep.UseMessageRetry(r => r.Interval(2,100));
并在控制台终端上得到了这个:
======> [Message Received] data:post request body pattern:post request fail: Masstransit.ReceiveTransport[0] S-FAULT rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext 07f80000-5d0a-0015-3b01-08d8b2ba5551 Altsrc.MasstransitSample.EventContracts.MsgContext warn: Masstransit.ReceiveTransport[0] R-RETRY rabbitmq://localhost/profile_queue (null) Masstransit.Context.RetryConsumeContext<Altsrc.MasstransitSample.EventContracts.MsgContext> Masstransit.RabbitMqTransport.MessageNotConfirmedException: exchange:Altsrc.MasstransitSample.EventContracts:MsgContext => The message was not confirmed: Value of type 'UInt64' cannot appear as table value at Masstransit.RabbitMqTransport.Integration.BatchPublisher.<>c__displayClass8_0.<<Publish>g__PublishAsync|0>d.MoveNext() --- End of stack trace from prevIoUs location where exception was thrown --- at GreenPipes.Internals.Extensions.TaskExtensions.<>c__displayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from prevIoUs location where exception was thrown --- at Masstransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at Masstransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe,CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe,CancellationToken cancellationToken) at Masstransit.Initializers.MessageInitializer'2.Send(ISendEndpoint endpoint,InitializeContext'1 context,Object input) at Altsrc.MasstransitSample.EventContracts.NewUserConsumer.Consume(ConsumeContext'1 context) in E:\SampleProjects\Altsrc.MasstransitSample\Altsrc.MasstransitSample.EventContracts\NewUserConsumer.cs:line 23 at Masstransit.scoping.ScopeConsumerFactory'1.Send[TMessage](ConsumeContext'1 context,IPipe'1 next) at Masstransit.Pipeline.Filters.ConsumerMessageFilter'2.GreenPipes.IFilter<Masstransit.ConsumeContext<TMessage>>.Send(ConsumeContext'1 context,IPipe'1 next) at GreenPipes.Filters.RetryFilter'1.GreenPipes.IFilter<TContext>.Send(TContext context,IPipe'1 next)
解决方法
MassTransit 内置了对 reply_to
的支持,应该直接发送响应。
您可以在接收端点上配置 RawJsonMessageSerializer
来处理消息。
configurator.ClearMessageDeserializers();
configurator.UseRawJsonSerializer();
这将删除所有标准序列化程序并默认使用 JSON,因为您没有将 content_type 属性设置为 application/json
。