在 Amazon SQS 传输上发布的 NServiceBus 路由器事件不由 Azure 服务总线传输端点处理

问题描述

我一直在尝试让 NServiceBus.Router 工作以允许使用 AmazonSQS 传输和 AzureServiceBus 传输的端点相互通信。到目前为止,我能够通过路由器获取从 ASB 端点发送并由 SQS 端点处理的命令。但是,当我从 SQS 端点发布事件时,即使我已将 SQS 端点注册为发布者,它也不会由 ASB 端点处理。我不知道我做错了什么,但是查看我从 from the docs 中找到的每个示例,它似乎应该可以工作。

我已经尝试在下面的反向路由(SQS 到 ASB)中添加一个转发路由,但这并没有解决问题。

端点和路由器均在 .net 5 工作服务中运行。

我制作了一个示例项目来重现问题 here,但这里有一些显示相关设置的快速概览片段:

路由器设置

var routerConfig = new RouterConfiguration("ASBToSQS.Router");

var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB",t =>
{
    t.ConnectionString(Environment.GetEnvironmentvariable("ASB_CONNECTION_STRING"));

    t.Transactions(TransportTransactionMode.ReceiveOnly);
    t.SubscriptionRuleNamingConvention((entityType) =>
    {
        var entityPathOrName = entityType.Name;
        if (entityPathOrName.Length >= 50)
        {
            return entityPathOrName.Split('.').Last();
        }

        return entityPathOrName;
    });
});

var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS",t =>
{
    t.UnrestrictedDurationDelayedDelivery();

    t.Transactions(TransportTransactionMode.ReceiveOnly);

    var settings = t.GetSettings();

    // Avoids a missing setting error
    //https://github.com/SzymonPobiega/NServiceBus.Raw/blob/master/src/AcceptanceTests.SQS/Helper.cs#L18
    bool isMessageType(Type t) => true;
    var ctor = typeof(MessageMetadataRegistry).GetConstructor(
        BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance,null,new[] {typeof(Func<Type,bool>)},null);
#pragma warning disable CS0618 // Type or member is obsolete
    settings.Set<MessageMetadataRegistry>(ctor.Invoke(new object[] {(Func<Type,bool>) isMessageType}));
#pragma warning restore CS0618 // Type or member is obsolete

});

var staticRouting = routerConfig.UseStaticRoutingProtocol();

staticRouting.AddForwardRoute("ASB","SQS");

routerConfig.autocreateQueues();

ASB 端点设置

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.ASBEndpoint");

var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();

transport.SubscriptionRuleNamingConvention((entityType) =>
{
    var entityPathOrName = entityType.Name;
    if (entityPathOrName.Length >= 50)
    {
        return entityPathOrName.Split('.').Last();
    }

    return entityPathOrName;
});

transport.Transactions(TransportTransactionMode.ReceiveOnly);
transport.ConnectionString(Environment.GetEnvironmentvariable("ASB_CONNECTION_STRING"));

var bridge = transport.Routing().ConnectToRouter("ASBToSQS.Router");

bridge.RoutetoEndpoint(typeof(ASBToSQSCommand),"ASBToSQSRouter.SQSEndpoint");
bridge.RegisterPublisher(typeof(ASBToSQSEvent),"ASBToSQSRouter.SQSEndpoint");

endpointConfiguration.EnableInstallers();

SQS 端点设置(没什么特别的,因为它不需要了解路由器)

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.SQSEndpoint");

var transport = endpointConfiguration.UseTransport<SqsTransport>();

transport.UnrestrictedDurationDelayedDelivery();

transport.Transactions(TransportTransactionMode.ReceiveOnly);

endpointConfiguration.EnableInstallers();

任何帮助将不胜感激!

解决方法

不幸的是,最近的 SQS 传输版本之一包含一项更改,该更改使订阅仅在完整 NServiceBus 端点的上下文中默认工作。此功能是订阅批处理。

为了让 Router 正常工作(Router 不运行完整的端点,只运行 NServiceBus 传输),您需要在 SQS 接口配置中添加这条神奇的线:

settings.Set("NServiceBus.AmazonSQS.DisableSubscribeBatchingOnStart",true);

这是一个未公开的标志,用于禁用订阅批处理并允许路由器正常完成订阅操作。

很抱歉给您带来不便。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...