为什么Azure QueueTriggered函数应用程序抛出超时异常?

问题描述

我已将QueueTriggered功能应用程序部署到Azure门户。但是似乎有时抛出Timeout超出了异常。我收到此错误,并考虑将functionTimeout值从30分钟更改为2小时。
但是仍然给我同样的错误。我担心的是,为什么QueueTrigger引发此错误。我的工作就是传递消息。
我正在从队列消息中获取Blob名称,并将其作为参数传递给Snowflake过程以加载该Blob。雪花几乎不需要一分钟即可加载该斑点。但是为什么功能应用程序超时?
我遇到的错误是:

Timeout value of 02:00:00 was exceeded by function: Functions.myFunctionAppName
Exception Type: Microsoft.Azure.WebJobs.Host.FunctionTimeoutException
Failed method: Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<TryHandleTimeoutAsync>d__29.MoveNext

调用堆栈:

Microsoft.Azure.WebJobs.Host.FunctionTimeoutException:
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<TryHandleTimeoutAsync>d__29.MoveNext (Microsoft.Azure.WebJobs.Host,Version=3.0.17.0,Culture=neutral,PublicKeyToken=31bf3856ad364e35Microsoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.csMicrosoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: 665)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib,Version=4.0.0.0,PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<InvokeAsync>d__27.MoveNext (Microsoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: 576)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<ExecuteWithWatchersAsync>d__26.MoveNext (Microsoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: 532)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)    
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<ExecuteWithLoggingAsync>d__25.MoveNext (Microsoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: 470)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<ExecuteWithLoggingAsync>d__19.MoveNext (Microsoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: 278)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib,PublicKeyToken=31bf3856ad364e35: 325)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib,PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor+<TryExecuteAsyncCore>d__16.MoveNext (Microsoft.Azure.WebJobs.Host,PublicKeyToken=31bf3856ad364e35: 117)

我使用Python开发的代码:

import logging
import json
import os
import snowflake.connector
from azure.common.credentials import ServicePrincipalCredentials

# packages for reading blob (private key p8 file)
from azure.storage.blob import BlobServiceClient
from azure.core._match_conditions import MatchConditions

# packages to decrypt private key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
import azure.functions as func


def execute_process(sourcecode,sourcefilename):
try:
    # storage connection string where Private Key p8 file is stored
    storage_conn = os.getenv("RsaStorage")
    # open service client to interact with storage
    blob_service_client = BlobServiceClient.from_connection_string(
        storage_conn)
    # get blob stored in container get_blob_client(container,blob)
    file_client = blob_service_client.get_blob_client(
        "access-files","rsa/rsa_key.p8")
    # download blob to a variable
    f = file_client.download_blob()
    # get content of blob into key variable
    key = f._current_content
    # decrypt private key (obtained above) using private key password
    p_key = serialization.load_pem_private_key(
        key,password=os.environ["PRIVATE_KEY_PASSPHRASE"].encode(),backend=default_backend(),)
    # read private key bytes
    pkb = p_key.private_bytes(
        encoding=serialization.Encoding.DER,format=serialization.PrivateFormat.PKCS8,encryption_algorithm=serialization.NoEncryption(),)
    # create a Snowflake connection string
    conn = snowflake.connector.connect(user=os.getenv("user"),account=os.environ["account"],private_key=pkb,warehouse=os.environ["warehouse"],database=os.environ["database"],schema=os.environ["schema"],role=os.environ["role"])
    queryToExecute = ""
    # execute Snowflake query
    if sourcecode.upper() == "con1":
        queryToExecute = f"CALL Procedure1(NULL,'{sourcecode}','{sourcefilename}');"
    elif sourcecode.upper() == "con2":
        queryToExecute = f"CALL Procedure2(NULL,'{sourcefilename}');"
    elif sourcecode.upper() == "con3":
        queryToExecute = f"CALL procedure3(NULL,'{sourcefilename}');"
    if queryToExecute != "":
        for retMsg in conn.cursor().execute(queryToExecute):
            logging.info(
                "\nMessage returned from procedure : {0}".format(retMsg))
    else:
        logging.info(f"No query to execute for blob: {0}",sourcefilename)
except Exception as ex:
    raise Exception(ex)


def main(msg: func.QueueMessage) -> None:
try:
    result = json.loads(msg.get_body().decode('utf-8'))
    subject = result["subject"]
    blob_name = subject[subject.rindex("/") + 1:]
    sourcefilename = subject[subject.index("incoming/"):]
    sourcecode = blob_name[blob_name.rindex(".") + 1:]
    logging.info('PharmacyDataLoader function started processing: %s',blob_name)
    execute_process(sourcecode,sourcefilename)
    logging.info(f"completed PharmacyDataLoader function for {blob_name}")
except Exception as ex:
    logging.error(
        f"PharmacyDataLoader function failed to process {subject} with error {ex}"
    )

有什么办法解决这个问题?

解决方法

高级计划中的天蓝色功能支持在Premium Plan.中提供60分钟的保证,您可以尝试使用该功能。

但是,我建议您考虑使用“耐用功能”框架,该框架可让您将工作分解为较小的垃圾。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...