在 dot net core 应用程序中使用 RabbitMQ 在 MassTransit 中跳过队列

问题描述

我有三个项目。一个是Dot net core MVC,两个是API项目。 MVC 正在调用一个 API 来获取用户详细信息。当询问用户详细信息时,我通过 Masstransit 向队列发送消息。我看到跳过队列。第三个项目有消费者,即 API 项目。

我尝试为具有相同配置的演示制作另一个解决方案。运行良好。

以下是 MVC Razor 页面代码..

public async Task<IActionResult> OnPostAsync(string returnUrl = null)
    {
        ReturnUrl = returnUrl;

        if (ModelState.IsValid)
        {
            var user = await AuthenticateUser(Input.Email);

            if (user == null)
            {
                ModelState.AddModelError(string.Empty,"Invalid login attempt.");
                return Page();
            }

            #region snippet1
            var claims = new List<Claim>
            {
                new Claim(ClaimTypes.Name,user.Email),new Claim("FullName",user.FullName),new Claim(ClaimTypes.Role,"Administrator"),};

            var claimsIdentity = new ClaimsIdentity(
                claims,CookieAuthenticationDefaults.AuthenticationScheme);

            var authProperties = new AuthenticationProperties
            {
                ExpiresUtc = DateTimeOffset.UtcNow.AddMinutes(15),IsPersistent = true,};

            await HttpContext.SignInAsync(
                CookieAuthenticationDefaults.AuthenticationScheme,new ClaimsPrincipal(claimsIdentity),authProperties);
            #endregion

            _logger.Loginformation("User {Email} logged in at {Time}.",user.Email,DateTime.UtcNow);

            return LocalRedirect(Url.GetLocalUrl(returnUrl));
        }

        return Page();
    }

    private async Task<ApplicationUser> AuthenticateUser(string email)
    {
        if (!string.IsNullOrEmpty(email))
        {
            using (var client = new System.Net.Http.HttpClient())
            {
                var request = new System.Net.Http.HttpRequestMessage();
                request.RequestUri = new Uri("http://localhost:52043/api/user?uName=" + email); // ASP.NET 3 (VS 2019 only)
                var response = await client.SendAsync(request);
                var customer = Newtonsoft.Json.JsonConvert.DeserializeObject<Customers>(response.Content.ReadAsstringAsync().Result);

                
                return new ApplicationUser()
                {
                    Email = email,FullName = customer.FullName
                };
            }
        }
        else
        {
            return null;
        }
    }

MVC 启动:

services.AddMasstransit(x =>
        {
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                // configure health checks for this bus instance
                cfg.UseHealthCheck(provider);

                cfg.Host("rabbitmq://localhost");

            }));
        });

        services.AddMasstransitHostedService();

用户 API 代码 - 52043:

[HttpGet]
    public async Task<IActionResult> Get(string uName)
    {
        var customer = _userRepository.GetCustomerByUserName(uName);

        Uri uri = new Uri("rabbitmq://localhost/loginqueue");
        var endpoint = await _bus.GetSendEndpoint(uri);
        await endpoint.Send(new Loginobj() { NoteString = customer.FullName + " has logged in at " + DateTime.Now.ToString() });
        
        return Json(customer);
    }

日志 API - 消费者代码

public class LoginConsumer : IConsumer<Loginobj>
{
    private readonly ILogger<object> _logger;

    public LoginConsumer(ILogger<object> logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<Loginobj> context)
    {
        var data = context.Message;
        _logger.Loginformation(data.ToString());
    }
}

登录 API 启动:

services.AddMasstransit(x =>
        {
            x.AddConsumer<LoginConsumer>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                // configure health checks for this bus instance
                cfg.UseHealthCheck(provider);

                cfg.Host("rabbitmq://localhost");

                cfg.ReceiveEndpoint("loginqueue",ep =>
                {
                    ep.PrefetchCount = 16;
                    ep.UseMessageRetry(r => r.Interval(2,100));

                    ep.ConfigureConsumer<LoginConsumer>(provider);
                });
            }));
        });

        services.AddMasstransitHostedService();

解决方法

根据the documentation

MassTransit 使用完整的类型名称,包括命名空间,用于消息协定。在两个不同的项目中创建相同的消息类型时,命名空间必须匹配,否则消息将不会被消费。

确保您的消息类型在每个项目中具有相同的命名空间/类型。