Azure服务总线主题订阅会话未按要求关闭

问题描述

[编辑:我重新制定并简化了最初的问题]

我在会话中使用Azure服务总线主题/订阅,并且在关闭会话时遇到问题。

有关背景信息,我的应用程序从一个主题订阅会话(FIFO要求)接收数据,该会话通常保持活动状态。我们只需要偶尔偶尔暂停一下数据流。

当请求此数据流“暂停”时,我们退出订阅会话并等待再次要求打开会话。

// pseudo code
public class Test
{
    public static async Task Me()
    {
        var client = new SubscriptionClient(
          EndPoint,Path,Name,TokenProvider,TransportType.Amqp,ReceiveMode.PeekLock,new RetryExponential(
            minimumBackoff: TimeSpan.FromSeconds(1),maximumBackoff: TimeSpan.FromSeconds(30),maximumRetryCount: 10));
            
        // Setup consumer options
        var sessionoptions = new SessionHandlerOptions(OnHandleExceptionReceived)
        {
            AutoComplete = false,MessageWaitTimeout = TimeSpan.FromSeconds(10),MaxConcurrentSessions = 1,};

        // Registration 1 - Start data flow
        client.RegisterSessionHandler(OnMessageSessionAsync,sessionoptions);

        // Wait 1 - Artificially wait for 'data flow pause' to kick in.
        //          For the sake of this example,we artificially give plenty 
        //          of time to the message session handler to receive something
        //          and close the session.
        Task.Wait(TimeSpan.FromSeconds(30));

        // Registration 2 - Artificially 'unpause' data flow
        client.RegisterSessionHandler(OnMessageSessionAsync,sessionoptions);

        // Wait 2 - Artificially wait for 'pause' to kick in again
        Task.Wait(TimeSpan.FromSeconds(30));

        // Finally close client
        await client.CloseAsync();
    }

    private static async Task OnMessageSessionAsync(IMessageSession session,Message message,CancellationToken cancellationToken)
    {
        try
        {
            await client.CompleteAsync(message.SystemProperties.LockToken);

            // Process message .. It doesn't matter what it is,// just that at some point I want to break away from session
            if (bool.TryParse(message.UserProperties["SessionCompleted"] as string,out bool completed) && completed)
                await session.CloseAsync(); // <-- This never works
        }
        catch (Exception e)
        {
            Console.WriteLine("OnMessageSessionAsync exception: {0}",e);

            // Indicates a problem,unlock message in subscription.
            await client.AbandonAsync(message.SystemProperties.LockToken);
        }
    }

    private static Task OnHandleExceptionReceived(ExceptionReceivedEventArgs e)
    {
        var context = e.ExceptionReceivedContext;

        Options.Logger?.LogWarning(e.Exception,new StringBuilder()
            .AppendLine($"Message handler encountered an exception {e.Exception.GetType().Name}.")
            .AppendLine("Exception context for troubleshooting:")
            .AppendLine($" - Endpoint: {context.Endpoint}")
            .AppendLine($" - Entity Path: {context.EntityPath}")
            .Append($" - Executing Action: {context.Action}"));

        return Task.CompletedTask;
    }
}

问题:

如前所述,由于调用session.CloseAsync()似乎无效,我在退出会话时遇到了问题。即使我明确要求会话停止,消息仍然不断出现。

  • 是不能直接关闭主题会话的正常行为吗?如果是这样,为什么还要公开通话session.CloseAsync()
  • 实际上可以独立于订阅连接关闭会话吗?

ps1::我的代码基于Microsoft的官方sample made available on github.com 。尽管此示例基于队列会话而不是主题会话,但在我看来,行为应该相同是合理的。

ps2::我深入研究了Microsoft.Azure.ServiceBus存储库中的原因,我想知道MessageSession.OwnsConnection属性的背后是否缺少变量初始化..

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)