即使 autoComplete = false,Azure Function v3 .NET Core 3.1 ServiceBusTrigger “消息处理错误Action=RenewLock”

问题描述

我有一个运行长时间运行任务的队列触发器。遇到了锁定持续时间和更新的各种问题,所以我决定在整个过程的一开始就手动完成消息。如果作业遇到错误,则将消息发送回队列。

在下面的日志数据中,您可以看到它获取消息、完成它、启动任务、记录似乎没有任何影响的 lock supplied is invalid 错误,然后完成任务。我想停止这个错误。很烦。

[FunctionName("QueueTrigger")]
public async Task RunQueue([ServiceBusTrigger("queuename",Connection = "cn")] Message message,MessageReceiver messageReceiver,[ServiceBus("queuename",Connection = "cn")] IAsyncCollector<Message> queue)
{
    var msg = JsonConvert.DeserializeObject<RequestObj>(Encoding.UTF8.GetString(message.Body));
    logger.LogInfo("*** Before complete");
    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
    logger.LogInfo("*** After complete");
    try
    {
        if(msg.Whatever > 1)   throw new Exception("Bogus exception thrown for testing.");

        var result = await RunTheTask(msg);
        if (!result.IsSuccessful)
        {
            logger.LogError(result.Failure);
            throw new ApplicationException(result.Failure);
        };
    }
    catch (Exception ex)
    {
        logger.LogError(ex.Message);
        logger.LogInfo("*** About to requeue");
        await queue.AddAsync(new Message(message.Body));
        logger.LogSuccess("*** Queued the message again.");
    }
}

host.json 文件serviceBus 这样的配置(关闭 autoComplete

"serviceBus": {
      "prefetchCount": 1,"messageHandlerOptions": {
        "autoComplete": false,"maxConcurrentCalls": 1
      },"SessionHandlerOptions": {
        "autoComplete": false
      },"BatchOptions": {
        "AutoComplete": false
      }

在这两种情况下,代码都按预期工作,但在这两种情况下,我总是看到“无效锁定”错误

一些日志内容

[2021-02-01T20:46:27.248Z] Executing 'QueueTrigger' (Reason='(null)',Id=520a670c-e636-49b8-96b5-7502f6b479ac)
[2021-02-01T20:46:27.251Z] Trigger Details: MessageId: d7af56ed083947e99ce0fc468f111e4d,SequenceNumber: 37,DeliveryCount: 1,EnqueuedTimeUtc: 2021-02-01T20:40:44.5390000Z,LockedUntilUtc: 2021-02-01T20:41:44.5550000Z,SessionId: (null)
[2021-02-01T20:46:28.331Z] *** Before complete
[2021-02-01T20:46:28.471Z] *** After complete
[2021-02-01T20:46:28.498Z] Running the Task
[2021-02-01T20:46:28.826Z] Message processing error (Action=RenewLock,ClientId=whatever,EntityPath=queueName,Endpoint=blah.servicebus.windows.net)
[2021-02-01T20:46:28.830Z] Microsoft.Azure.ServiceBus: The lock supplied is invalid. Either the lock expired,or the message has already been removed from the queue. Reference:30780d70-2887-4923-8725-329ab94dd4b2,TrackingId:e3e85f42-1c62-44da-b858-1c856f972a1f_B14,SystemTracker:x:Queue:queueName,Timestamp:2021-02-01T20:40:46.
[2021-02-01T20:46:38.855Z] Finished the Task
[2021-02-01T20:46:42.221Z] Executed 'QueueTrigger' (Succeeded,Id=520a670c-e636-49b8-96b5-7502f6b479ac,Duration=14992ms)

解决方法

我觉得你应该把'complete'方法移到最后,函数会自动刷新锁过期时间,在正常情况下保持可用。但是一旦你使用了'complete'方法,该函数就会生成最后一个锁,如果你的后续逻辑超过了锁时间,就会抛出一个锁过期异常。可以看下面的代码,比较一下使用complete和不使用complete的区别:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace FunctionApp71
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task Run([ServiceBusTrigger("queuename",Connection = "cn")] Message message,MessageReceiver messageReceiver,[ServiceBus("queuename",Connection = "cn")] IAsyncCollector<Message> queue,ILogger logger)
        {
            logger.LogInformation("When the message comes in,the expire time is: "+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is 2/4/2021 8:29:00 AM

            Thread.Sleep(6000);

            logger.LogInformation("After 6 seconds,the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//The time of the lock is renewed,so now the time is 2/4/2021 8:29:04 AM

            //When you complete the message,there is no way to renew the lock because the lock is not exist(Azure function stop renew the lock.).
            await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
            logger.LogInformation("After complete the message,the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is still 2/4/2021 8:29:04 AM

            Thread.Sleep(6 * 1000);//Your try catch is here.

            logger.LogInformation("After complete message and 6 seconds later,the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());
        }
    }
}

enter image description here

,

记录了一个问题:https://github.com/Azure/azure-functions-servicebus-extension/issues/137 问题有一个指向 repo 的链接以进行 repro。