接收方未通过大众运输接收消息-订阅

问题描述

我正在使用下面的代码来设置Masstransit以使用ServiceBus

private static ServiceProvider SetupServiceCollection()
{
    var connectionString = ConfigurationManager.AppSettings["AzureServiceBusConnectionString"];
    var services = new ServiceCollection()
        .AddMasstransit(x =>
        {
            x.UsingAzureServiceBus((context,cfg) =>
            {
                cfg.Host(connectionString);
                cfg.ConfigureEndpoints(context);
                cfg.Message<MyMessage>(x =>
                {
                    x.SetEntityName("my-topic");
                });
            });
        });

    return services.BuildServiceProvider(); 
}

我使用以下代码发送消息

var message = new MyMessage()
{
    MessageIdentifier = Guid.NewGuid().ToString(),};

await _busControl.Publish(message);

我希望将邮件仅发送到我的主题

但是,Masstransit正在创建主题名称似乎是使用类型名称生成的。我要如何完全停止呢?

我正在如下设置接收器

public static void SetupMasstransit(this ServiceCollection services,string connectionString)
{
    services.AddMasstransit(x =>
    {
        x.UsingAzureServiceBus((context,cfg) =>
        {
            cfg.Host(connectionString);
            cfg.ConfigureEndpoints(context);
            x.UsingAzureServiceBus((context,cfg) =>
            {
                cfg.Host(connectionString);
                cfg.SubscriptionEndpoint<MyMessage>("low",e =>
                {
                    e.Consumer<MyMessageConsumer>(context);
                    e.PrefetchCount = 100;
                    e.MaxConcurrentCalls = 100;
                    e.LockDuration = TimeSpan.FromMinutes(5);
                    e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
                    e.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
                    e.UseInMemoryOutBox();
                    e.ConfigureConsumetopology = false;
                });
        });
    });
}

我可以看到该消息已正确发送,如Service Bus Explorer订阅中所示。但是,接收器没有接听吗?没有错误或其他任何事情吗?真令人沮丧

保罗

解决方法

您正在呼叫ConfigureEndpoints,默认情况下,这将为已添加的使用者,sagas等创建接收端点。但是,您的代码示例未显示任何.AddConsumer方法。如果您没有任何消费者,请不要致电ConfigureEndpoints

对于接收者,您应该使用:

public static void SetupMassTransit(this ServiceCollection services,string connectionString)
{
    services.AddMassTransit(x =>
    {
        x.AddConsumer<MyMessageConsumer>();
        
        x.UsingAzureServiceBus((context,cfg) =>
        {
            cfg.Host(connectionString);

            cfg.SubscriptionEndpoint("your-topic-name","your-subscription-name",e =>
            {
                e.PrefetchCount = 100;
                e.MaxConcurrentCalls = 100;
                e.LockDuration = TimeSpan.FromMinutes(5);
                e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);

                e.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
                e.UseInMemoryOutbox();

                e.ConfigureConsumer<MyMessageConsumer>(context);
            });
        });
    });
}

对于生产者,您可以简单地使用:

private static ServiceProvider SetupServiceCollection()
{
    var connectionString = ConfigurationManager.AppSettings["AzureServiceBusConnectionString"];
    var services = new ServiceCollection()
        .AddMassTransit(x =>
        {
            x.UsingAzureServiceBus((context,cfg) =>
            {
                cfg.Host(connectionString);
            });
        });

    return services.BuildServiceProvider(); 
}

然后使用上面创建的IServiceProvider进行发布:

var bus = serviceProvider.GetRequiredService<IBus>();
var endpoint = await bus.GetSendEndpoint(new Uri("topic:your-topic-name"));
await endpoint.Send(new MyMessage());

这应该满足您所需的绝对最低要求。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...