MassTransit 序列化非 Masstransit 消息

问题描述

作为消费者,我一直在为我的项目使用 Masstransit。 Masstransit 服务将使用来自另一个服务的消息,我构建的另一个服务当前在 nestJS 上运行并使用 RabbitMq 发送消息。当我尝试序列化包含如下基本属性的 Rabbitmq 消息时出现的问题:

properties

但由于 Masstransit 将使用接口包装我的消息,因此我无法获取存储在属性中的值。它是用于回复 RabbitMq 消息的“回复”地址。有什么办法可以获得那个价值吗?或者我如何应用“直接回复功能响应rabbitmq消息?

更新 1 作为 Chris Patterson 的回应,我尝试做这样的事情:

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
        services.AddControllersWithViews();
        services.AddMasstransit(x =>
        {
            x.AddConsumer<NewUserConsumer>();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.UseHealthCheck(provider);
                cfg.Host(new Uri("rabbitmq://localhost"),h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });
                cfg.ReceiveEndpoint("profile_queue",ep =>
                {
                    ep.PrefetchCount = 16;
                    //ep.UseMessageRetry(r => r.Interval(2,100));
                    ep.ClearMessageDeserializers();
                    ep.UseRawJsonSerializer();
                    ep.ConfigureConsumer<NewUserConsumer>(provider);
                });
            }));
        });
        services.AddMasstransitHostedService();
}

NewUserConsumer.cs

public class NewUserConsumer : IConsumer<MsgContext>
{
    public async Task Consume(ConsumeContext<MsgContext> context)
    {
        var data = context.Message.data;
        var pattern = context.Message.pattern;
        var id = context.Message.id;

        Console.WriteLine("======>  [Message Received]");
        Console.WriteLine($"data:{data}");
        Console.WriteLine($"pattern:{pattern}");

        await context.RespondAsync<MsgContext>(new
        {
            data = $"Received : {id}",pattern = "add-profile"
        });
    }
}

但我收到了这条消息:

 fail: Masstransit.ReceiveTransport[0]
  S-FAULT 
  rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext 
  07f80000-5d0a-0015-c6d0-08d8b1ee0c86Altsrc.MasstransitSample.EventContracts.MsgContext

我注意到当我收到此错误时,rabbitmq 管理显示 Ready: 0,Unacked: 1,Total: 1 但一旦终端关闭,它将显示 Ready: 1,Unacked: 0,Total: 1。不知何故我的配置不正确或者我错过了什么?。

更新 2 我尝试使用 ep.UseMessageRetry(r => r.Interval(2,100)); 并在控制台终端上得到了这个:

======> [Message Received] data:post request body pattern:post request fail: Masstransit.ReceiveTransport[0] S-FAULT rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext 07f80000-5d0a-0015-3b01-08d8b2ba5551 Altsrc.MasstransitSample.EventContracts.MsgContext warn: Masstransit.ReceiveTransport[0] R-RETRY rabbitmq://localhost/profile_queue (null) Masstransit.Context.RetryConsumeContext<Altsrc.MasstransitSample.EventContracts.MsgContext> Masstransit.RabbitMqTransport.MessageNotConfirmedException: exchange:Altsrc.MasstransitSample.EventContracts:MsgContext => The message was not confirmed: Value of type 'UInt64' cannot appear as table value at Masstransit.RabbitMqTransport.Integration.BatchPublisher.<>c__displayClass8_0.<<Publish>g__PublishAsync|0>d.MoveNext() --- End of stack trace from prevIoUs location where exception was thrown --- at GreenPipes.Internals.Extensions.TaskExtensions.<>c__displayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from prevIoUs location where exception was thrown --- at Masstransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at Masstransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe,CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe,CancellationToken cancellationToken) at Masstransit.Initializers.MessageInitializer'2.Send(ISendEndpoint endpoint,InitializeContext'1 context,Object input) at Altsrc.MasstransitSample.EventContracts.NewUserConsumer.Consume(ConsumeContext'1 context) in E:\SampleProjects\Altsrc.MasstransitSample\Altsrc.MasstransitSample.EventContracts\NewUserConsumer.cs:line 23 at Masstransit.scoping.ScopeConsumerFactory'1.Send[TMessage](ConsumeContext'1 context,IPipe'1 next) at Masstransit.Pipeline.Filters.ConsumerMessageFilter'2.GreenPipes.IFilter<Masstransit.ConsumeContext<TMessage>>.Send(ConsumeContext'1 context,IPipe'1 next) at GreenPipes.Filters.RetryFilter'1.GreenPipes.IFilter<TContext>.Send(TContext context,IPipe'1 next)

解决方法

MassTransit 内置了对 reply_to 的支持,应该直接发送响应。

您可以在接收端点上配置 RawJsonMessageSerializer 来处理消息。

configurator.ClearMessageDeserializers();
configurator.UseRawJsonSerializer();

这将删除所有标准序列化程序并默认使用 JSON,因为您没有将 content_type 属性设置为 application/json