问题描述
我一直在尝试让 NServiceBus.Router 工作以允许使用 AmazonSQS 传输和 AzureServiceBus 传输的端点相互通信。到目前为止,我能够通过路由器获取从 ASB 端点发送并由 SQS 端点处理的命令。但是,当我从 SQS 端点发布事件时,即使我已将 SQS 端点注册为发布者,它也不会由 ASB 端点处理。我不知道我做错了什么,但是查看我从 from the docs 中找到的每个示例,它似乎应该可以工作。
我已经尝试在下面的反向路由(SQS 到 ASB)中添加另一个转发路由,但这并没有解决问题。
端点和路由器均在 .net 5 工作服务中运行。
我制作了一个示例项目来重现问题 here,但这里有一些显示相关设置的快速概览片段:
路由器设置
var routerConfig = new RouterConfiguration("ASBToSQS.Router");
var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB",t =>
{
t.ConnectionString(Environment.GetEnvironmentvariable("ASB_CONNECTION_STRING"));
t.Transactions(TransportTransactionMode.ReceiveOnly);
t.SubscriptionRuleNamingConvention((entityType) =>
{
var entityPathOrName = entityType.Name;
if (entityPathOrName.Length >= 50)
{
return entityPathOrName.Split('.').Last();
}
return entityPathOrName;
});
});
var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS",t =>
{
t.UnrestrictedDurationDelayedDelivery();
t.Transactions(TransportTransactionMode.ReceiveOnly);
var settings = t.GetSettings();
// Avoids a missing setting error
//https://github.com/SzymonPobiega/NServiceBus.Raw/blob/master/src/AcceptanceTests.SQS/Helper.cs#L18
bool isMessageType(Type t) => true;
var ctor = typeof(MessageMetadataRegistry).GetConstructor(
BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance,null,new[] {typeof(Func<Type,bool>)},null);
#pragma warning disable CS0618 // Type or member is obsolete
settings.Set<MessageMetadataRegistry>(ctor.Invoke(new object[] {(Func<Type,bool>) isMessageType}));
#pragma warning restore CS0618 // Type or member is obsolete
});
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("ASB","SQS");
routerConfig.autocreateQueues();
ASB 端点设置
var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.ASBEndpoint");
var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
transport.SubscriptionRuleNamingConvention((entityType) =>
{
var entityPathOrName = entityType.Name;
if (entityPathOrName.Length >= 50)
{
return entityPathOrName.Split('.').Last();
}
return entityPathOrName;
});
transport.Transactions(TransportTransactionMode.ReceiveOnly);
transport.ConnectionString(Environment.GetEnvironmentvariable("ASB_CONNECTION_STRING"));
var bridge = transport.Routing().ConnectToRouter("ASBToSQS.Router");
bridge.RoutetoEndpoint(typeof(ASBToSQSCommand),"ASBToSQSRouter.SQSEndpoint");
bridge.RegisterPublisher(typeof(ASBToSQSEvent),"ASBToSQSRouter.SQSEndpoint");
endpointConfiguration.EnableInstallers();
SQS 端点设置(没什么特别的,因为它不需要了解路由器)
var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.SQSEndpoint");
var transport = endpointConfiguration.UseTransport<SqsTransport>();
transport.UnrestrictedDurationDelayedDelivery();
transport.Transactions(TransportTransactionMode.ReceiveOnly);
endpointConfiguration.EnableInstallers();
任何帮助将不胜感激!
解决方法
不幸的是,最近的 SQS 传输版本之一包含一项更改,该更改使订阅仅在完整 NServiceBus 端点的上下文中默认工作。此功能是订阅批处理。
为了让 Router 正常工作(Router 不运行完整的端点,只运行 NServiceBus 传输),您需要在 SQS 接口配置中添加这条神奇的线:
settings.Set("NServiceBus.AmazonSQS.DisableSubscribeBatchingOnStart",true);
这是一个未公开的标志,用于禁用订阅批处理并允许路由器正常完成订阅操作。
很抱歉给您带来不便。