如何确保队列消息已在Azure Functions中成功处理?

问题描述

我有一个使用HTTP触发器和队列触发器构建的C#Azure Functions(根据App Service计划)应用程序。该应用程序通过在客户端计算机上安装脚本来工作,该脚本使用SQL查询将客户端数据库中的各种文件移至临时Azure Blob存储中,以从客户端数据库提取各种文件。每个文件完成后,将调用HTTP触发器,该触发器将为队列触发器创建队列消息,以拾取该消息并将文件从临时Blob存储区移至Blob存储区中的永久位置。 HTTP触发器完成并将消息放入队列后,执行将返回到客户端脚本以开始处理下一个SQL查询

我担心的是,当队列触发器实际上仍在工作或可能失败时,尤其是在并行处理多个客户端时,这些队列消息将堆积起来,并且客户端脚本将以错误的成功消息完成。在继续下一个SQL查询之前,是否有办法确保队列消息已成功处理?

编辑:添加代码示例

我可能有3个客户端,并且在其计算机上安装了应用程序,每个客户端都设置为在12AM执行这些脚本,并且由于它们托管在客户端计算机上,因此可以并行运行。 客户端脚本

// perform sql query to extract data from client database
// move extracted data to temporary Storage Blob hosted on the App Service storage account
return await httpClient.PostAsync(uri of the file in temporary blob storage)

文件准备好处理时,此前await会发布到HTTP。
Azure函数HTTP触发器

// get storage account credentials
// write message to storage queue "job-submissions'
return new OkResult();

现在,“职位提交”队列中有来自多个客户端的文件
Azure功能队列触发器

// pick up message from "job-submissions" queue
// use the Microsoft.Azure.Storage.Blob library to move files
// to a permanent spot in the data lake
// create Meta file with info about the file 
// Meta file contains info for when the extraction started and completed
// delete the temporary file
// job completed and the next queue message can be picked up  

所以问题是,当HTTP触发器将消息写入队列时,我无法得知队列已完成文件的处理。现在这不是什么大问题,因为该过程是如此迅速,以至于我在HTTP触发器中向队列中发送消息时,队列最多只需要几秒钟即可处理文件。我想知道各个作业何时完成的原因是,我在客户端脚本中有最后一步:
客户端脚本

// after all jobs for a client have been submitted by HTTP
// get storage account credentials
// write message to a queue "client-tasks-completed" 
// queue message contains client name in the message 
// initialVisibilityDelay set to 2 minutes 
// this ensures queue has finished processing the files

然后,一个单独的Python Azure函数在该队列上进行侦听以进行进一步处理:
Python QueueTrigger

# pick up message from "client-tasks-completed" queue
if 'client1' == queue_msg['ClientName']:
    # standardize @R_744_4045@ion within the files and write to our Azure sql database
elif 'client2' == queue_msg['ClientName']:
    # standardize @R_744_4045@ion within the files and write to our Azure sql database
elif 'client3' == queue_msg['ClientName']:
    # standardize @R_744_4045@ion within the files and write to our Azure sql database

Python Azure函数处于消耗计划中,并且batchSize设置为1,因为客户端文件有时可能很大,并且我不想超过1.5GB的内存限制。所以我有两个问题,第一个是我如何知道第一个队列触发器已完成工作?第二个是,如何确保Python QueueTrigger不开始累积消息?我认为通过为侦听相同队列的两个队列触发器创建单独的Azure函数,可以潜在地解决这两个问题。这样可以减轻双方的负担,但是我不确定这是否是最佳实践。在这里我对问题2寻求更多指导的地方请参见我的问题:Using multiple Azure Functions QueueTriggers to listen on the same storage queue

解决方法

更新

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading;

namespace FunctionApp31
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function,"get","post",Route = null)] HttpRequest req,ILogger log)
        {

            string a = "111";

            a=XX(a).Result;

            return new OkObjectResult(a);
        }

        public static async Task<string> XX(string x)
        {
            await Task.Run(()=>{
                Thread.Sleep(3000);
                x = x + "222";
                Console.WriteLine(x);
                }
                );

            return x;
        } 
    }
}

原始答案:

我建议您顺序执行处理逻辑,而不是异步执行。或者,您可以等待异步操作完成后再返回,这样可以确保执行成功后再返回成功。(这样可以避免在队列仍按注释中所述处理时返回结果。)

我注意到您问了一个新问题。我认为您可以扩展实例,而不用创建多个功能的应用程序。 (当然,创建多个功能的应用程序没有问题。)如果您基于消耗计划,则实例将根据负载自动扩展。