问题描述
我有一个运行长时间运行任务的队列触发器。遇到了锁定持续时间和更新的各种问题,所以我决定在整个过程的一开始就手动完成消息。如果作业遇到错误,则将消息发送回队列。
在下面的日志数据中,您可以看到它获取消息、完成它、启动任务、记录似乎没有任何影响的 lock supplied is invalid
错误,然后完成任务。我想停止这个错误。很烦。
[FunctionName("QueueTrigger")]
public async Task RunQueue([ServiceBusTrigger("queuename",Connection = "cn")] Message message,MessageReceiver messageReceiver,[ServiceBus("queuename",Connection = "cn")] IAsyncCollector<Message> queue)
{
var msg = JsonConvert.DeserializeObject<RequestObj>(Encoding.UTF8.GetString(message.Body));
logger.LogInfo("*** Before complete");
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
logger.LogInfo("*** After complete");
try
{
if(msg.Whatever > 1) throw new Exception("Bogus exception thrown for testing.");
var result = await RunTheTask(msg);
if (!result.IsSuccessful)
{
logger.LogError(result.Failure);
throw new ApplicationException(result.Failure);
};
}
catch (Exception ex)
{
logger.LogError(ex.Message);
logger.LogInfo("*** About to requeue");
await queue.AddAsync(new Message(message.Body));
logger.LogSuccess("*** Queued the message again.");
}
}
host.json
文件有 serviceBus
这样的配置(关闭 autoComplete
)
"serviceBus": {
"prefetchCount": 1,"messageHandlerOptions": {
"autoComplete": false,"maxConcurrentCalls": 1
},"SessionHandlerOptions": {
"autoComplete": false
},"BatchOptions": {
"AutoComplete": false
}
在这两种情况下,代码都按预期工作,但在这两种情况下,我总是看到“无效锁定”错误
一些日志内容:
[2021-02-01T20:46:27.248Z] Executing 'QueueTrigger' (Reason='(null)',Id=520a670c-e636-49b8-96b5-7502f6b479ac)
[2021-02-01T20:46:27.251Z] Trigger Details: MessageId: d7af56ed083947e99ce0fc468f111e4d,SequenceNumber: 37,DeliveryCount: 1,EnqueuedTimeUtc: 2021-02-01T20:40:44.5390000Z,LockedUntilUtc: 2021-02-01T20:41:44.5550000Z,SessionId: (null)
[2021-02-01T20:46:28.331Z] *** Before complete
[2021-02-01T20:46:28.471Z] *** After complete
[2021-02-01T20:46:28.498Z] Running the Task
[2021-02-01T20:46:28.826Z] Message processing error (Action=RenewLock,ClientId=whatever,EntityPath=queueName,Endpoint=blah.servicebus.windows.net)
[2021-02-01T20:46:28.830Z] Microsoft.Azure.ServiceBus: The lock supplied is invalid. Either the lock expired,or the message has already been removed from the queue. Reference:30780d70-2887-4923-8725-329ab94dd4b2,TrackingId:e3e85f42-1c62-44da-b858-1c856f972a1f_B14,SystemTracker:x:Queue:queueName,Timestamp:2021-02-01T20:40:46.
[2021-02-01T20:46:38.855Z] Finished the Task
[2021-02-01T20:46:42.221Z] Executed 'QueueTrigger' (Succeeded,Id=520a670c-e636-49b8-96b5-7502f6b479ac,Duration=14992ms)
解决方法
我觉得你应该把'complete'方法移到最后,函数会自动刷新锁过期时间,在正常情况下保持可用。但是一旦你使用了'complete'方法,该函数就会生成最后一个锁,如果你的后续逻辑超过了锁时间,就会抛出一个锁过期异常。可以看下面的代码,比较一下使用complete和不使用complete的区别:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace FunctionApp71
{
public static class Function1
{
[FunctionName("Function1")]
public static async Task Run([ServiceBusTrigger("queuename",Connection = "cn")] Message message,MessageReceiver messageReceiver,[ServiceBus("queuename",Connection = "cn")] IAsyncCollector<Message> queue,ILogger logger)
{
logger.LogInformation("When the message comes in,the expire time is: "+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is 2/4/2021 8:29:00 AM
Thread.Sleep(6000);
logger.LogInformation("After 6 seconds,the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//The time of the lock is renewed,so now the time is 2/4/2021 8:29:04 AM
//When you complete the message,there is no way to renew the lock because the lock is not exist(Azure function stop renew the lock.).
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
logger.LogInformation("After complete the message,the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is still 2/4/2021 8:29:04 AM
Thread.Sleep(6 * 1000);//Your try catch is here.
logger.LogInformation("After complete message and 6 seconds later,the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());
}
}
}
,
记录了一个问题:https://github.com/Azure/azure-functions-servicebus-extension/issues/137 问题有一个指向 repo 的链接以进行 repro。