问题描述
[编辑:我重新制定并简化了最初的问题]
我在会话中使用Azure服务总线主题/订阅,并且在关闭会话时遇到问题。
有关背景信息,我的应用程序从一个主题订阅会话(FIFO要求)接收数据,该会话通常保持活动状态。我们只需要偶尔偶尔暂停一下数据流。
当请求此数据流“暂停”时,我们退出订阅会话并等待再次要求打开会话。
// pseudo code
public class Test
{
public static async Task Me()
{
var client = new SubscriptionClient(
EndPoint,Path,Name,TokenProvider,TransportType.Amqp,ReceiveMode.PeekLock,new RetryExponential(
minimumBackoff: TimeSpan.FromSeconds(1),maximumBackoff: TimeSpan.FromSeconds(30),maximumRetryCount: 10));
// Setup consumer options
var sessionoptions = new SessionHandlerOptions(OnHandleExceptionReceived)
{
AutoComplete = false,MessageWaitTimeout = TimeSpan.FromSeconds(10),MaxConcurrentSessions = 1,};
// Registration 1 - Start data flow
client.RegisterSessionHandler(OnMessageSessionAsync,sessionoptions);
// Wait 1 - Artificially wait for 'data flow pause' to kick in.
// For the sake of this example,we artificially give plenty
// of time to the message session handler to receive something
// and close the session.
Task.Wait(TimeSpan.FromSeconds(30));
// Registration 2 - Artificially 'unpause' data flow
client.RegisterSessionHandler(OnMessageSessionAsync,sessionoptions);
// Wait 2 - Artificially wait for 'pause' to kick in again
Task.Wait(TimeSpan.FromSeconds(30));
// Finally close client
await client.CloseAsync();
}
private static async Task OnMessageSessionAsync(IMessageSession session,Message message,CancellationToken cancellationToken)
{
try
{
await client.CompleteAsync(message.SystemProperties.LockToken);
// Process message .. It doesn't matter what it is,// just that at some point I want to break away from session
if (bool.TryParse(message.UserProperties["SessionCompleted"] as string,out bool completed) && completed)
await session.CloseAsync(); // <-- This never works
}
catch (Exception e)
{
Console.WriteLine("OnMessageSessionAsync exception: {0}",e);
// Indicates a problem,unlock message in subscription.
await client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task OnHandleExceptionReceived(ExceptionReceivedEventArgs e)
{
var context = e.ExceptionReceivedContext;
Options.Logger?.LogWarning(e.Exception,new StringBuilder()
.AppendLine($"Message handler encountered an exception {e.Exception.GetType().Name}.")
.AppendLine("Exception context for troubleshooting:")
.AppendLine($" - Endpoint: {context.Endpoint}")
.AppendLine($" - Entity Path: {context.EntityPath}")
.Append($" - Executing Action: {context.Action}"));
return Task.CompletedTask;
}
}
问题:
如前所述,由于调用session.CloseAsync()
似乎无效,我在退出会话时遇到了问题。即使我明确要求会话停止,消息仍然不断出现。
ps1::我的代码基于Microsoft的官方sample made available on github.com 。尽管此示例基于队列会话而不是主题会话,但在我看来,行为应该相同是合理的。
ps2::我深入研究了Microsoft.Azure.ServiceBus存储库中的原因,我想知道MessageSession.OwnsConnection
属性的背后是否缺少变量初始化..
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)