在 Durable Functions 中等待外部事件

问题描述

我有一个 orchestrator 功能,如下所示:

[FunctionName("O_orchestratorFunction)]
        public async Task<object> Process(
            [orchestrationTrigger] IDurableorchestrationContext context,ILogger log)
        {
            await context.CallActivityAsync("A_SendApprovalRequestemail","email");

            var _approvalResult = await context.WaitForExternalEvent<string>("ApprovalResult");

            if (_approvalResult == "Approved")
            {
                await context.CallActivityAsync("A_ResumeJobSync","email");
            }
            else
            {
                await context.CallActivityAsync("A_PauseJobSync","email");
            }
            return _approvalResult;
        }
    }

这是我的 A_SendApprovalRequestemail 活动函数

    [FunctionName("A_SendApprovalRequestemail")]
    public async Task Run([ActivityTrigger] string job,TraceWriter log)
    {
            var functionAddress = $"http://localhost:7071/api/";
            var approvedLink = functionAddress + "approve?id=MYID";
            var rejectedLink = functionAddress + "reject?id=MYID";
            var content = $"{approvedLink}  {rejectedLink}";
            await _mailRepository.SendMail("Subject",content,"<recipient-address>");
            log.Info($"Requesting approval for {job}");
            await Task.Delay(1000);
    }

如何修改代码以将 2 个链接连接到协调器 - 一个用于批准,一个用于拒绝(返回结果“已批准”或“已拒绝”)?

更新:

添加了 2 个启动函数

        [FunctionName("Approve")]
        public static async Task<IActionResult> Approve(
        [HttpTrigger(AuthorizationLevel.Anonymous,"get",Route = null)]
        HttpRequest req,ILogger log,[DurableClient] IDurableorchestrationClient client)
        {
            string orchestratorId = req.Query["id"];                 
            log.Loginformation($"Approval request for {orchestratorId}"); 
            var status = await client.GetStatusAsync(orchestratorId);
            if (status == null || status.RuntimeStatus != orchestrationRuntimeStatus.Running)
                return new NotFoundResult();

            await client.RaiseEventAsync(orchestratorId,"ApprovalResult","Approve");

            return new OkObjectResult("Approval successfull");
        }

        [FunctionName("Reject")]
        public static async Task<IActionResult> Reject(
           [HttpTrigger(AuthorizationLevel.Anonymous,Route = null)]
            HttpRequest req,[DurableClient] IDurableorchestrationClient client)
        {
            string orchestratorId = req.Query["id"];

            log.Loginformation($"Reject request for {orchestratorId}");

            var status = await client.GetStatusAsync(orchestratorId);

            if (status == null || status.RuntimeStatus != orchestrationRuntimeStatus.Running)
                return new NotFoundResult();


            await client.RaiseEventAsync(orchestratorId,"Reject");

            return new OkObjectResult("Rejected");
        }

这是我在点击批准/拒绝链接时看到的:

enter image description here

enter image description here

关于将断点置于

var status = await client.GetStatusAsync(orchestratorId);

我认为状态为空。

解决方法

您需要为拒绝和批准创建 2 个 HTTP 函数,就像您使用 IDurableOrchestratorClient 创建启动函数一样。这些函数需要通过某种方式来获取用于启动编排的唯一编排器 ID。这存储在编排器的 context.InstanceId 属性中。最简单的方法是在查询 www.test.com/api/approve?id=myid 中发送它,但还有其他解决方案,您可以通过 DB 或 jwt 令牌将查询值映射到哪个协调器。此 HTTP 触发功能是您电子邮件中的链接。然后,HTTP 触发函数可以使用此 ID 在编排器中引发事件。

        [FunctionName("Approve")]
        public static async Task<IActionResult> Approve(
           [HttpTrigger(AuthorizationLevel.Anonymous,"get",Route = null)]
            HttpRequest req,ILogger log,[DurableClient] IDurableOrchestrationClient client)
        {
            // Takes the id parameter in the url e.g. www.test.com/api/approve?id=SomeID would return SomeID
            string orchestratorId = req.Query["id"];

            log.LogInformation($"Approval request for {orchestratorId}");

            // Makes sure there is a running instance with this orchestrator 
            var status = await client.GetStatusAsync(orchestratorId);
            if (status == null || status.RuntimeStatus != OrchestrationRuntimeStatus.Running)
                return new NotFoundResult();

            //Raises event ApprovalResult with data Approve
            await client.RaiseEventAsync(orchestratorId,"ApprovalResult","Approve");

            return new OkObjectResult("Approval successfull");
        }

        [FunctionName("Reject")]
        public static async Task<IActionResult> Reject(
           [HttpTrigger(AuthorizationLevel.Anonymous,[DurableClient] IDurableOrchestrationClient client)
        {
            string orchestratorId = req.Query["id"];

            log.LogInformation($"Reject request for {orchestratorId}");

            var status = await client.GetStatusAsync(orchestratorId);

            if (status == null || status.RuntimeStatus != OrchestrationRuntimeStatus.Running)
                return new NotFoundResult();


            await client.RaiseEventAsync(orchestratorId,"Reject");

            return new OkObjectResult("Rejected");
        }

[FunctionName("O_OrchestratorFunction)]
        public async Task<object> Process(
            [OrchestrationTrigger] IDurableOrchestrationContext context,ILogger log)
        {
           TimeSpan timeout = TimeSpan.FromMinutes(30);
           DateTime deadline = context.CurrentUtcDateTime.Add(timeout);
           using (var cts = new CancellationTokenSource())
           {
            await context.CallActivityAsync("A_SendApprovalRequestEmail",new Tuple<string,string>("email",context.InstanceId));
            Task timeoutTask = context.CreateTimer(deadline,cts.Token);

            var approvalTask = context.WaitForExternalEvent<string("ApprovalResult");
            Task task = await Task.WhenAny(activityTask,timeoutTask);
            if (task == approvalTask)
            {
                cts.cancel();
                if(task.Result == "Approve")
                   await context.CallActivityAsync("A_ResumeJobSync","email");
                else 
                    await context.CallActivityAsync("A_PauseJobSync","email");
            }
            else
            {
                // Code for timeout scenario (maybe execute A_PauseJobSync)
            }
            return _approvalResult;
        }
    }
}

[FunctionName("A_SendApprovalRequestEmail")]
    public async Task Run([ActivityTrigger] Tuple<string,string> data,TraceWriter log)
    { // data.Item1 = "email" & data.Item2 = context.InstanceId
            var functionAddress = $"http://localhost:7071/api/";
            var approvedLink = functionAddress + $"approve?id={data.Item2}";
            var rejectedLink = functionAddress + $"reject?id={data.Item2}";
            var content = $"{approvedLink}  {rejectedLink}";
            await _mailRepository.SendMail("Subject",content,"<recipient-address>");
            log.Info($"Requesting approval for {job}");
            await Task.Delay(1000);
    }

以下是两个 HTTP 触发函数,您可以将其用作批准和拒绝函数,然后当有人点击该链接时,您将在电子邮件链接中的内容中发送到 www.myfuncapp.com/api/approve?id=MYID,批准函数将运行并引发事件 ApprovalResult数据批准。