问题描述
有几个持久函数可以相互调用。 主编排->子编排->Activity->Helper异步方法
每个 func 都有 ILogger 依赖项,并在函数开始和函数结束时登录。 由于某种原因,两个协调器都复制了“开始时”消息。 (见图) Activity 没有这种效果。 (见图) 多次运行下面的示例 - 相同的故事。
我也确定整个过程被触发了一次。
这是协调器中的错误还是预期行为?
using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
namespace Issues
{
public static class Log_Issue
{
[FunctionName("Main")]
public static async Task Runorchestrator(
[orchestrationTrigger] IDurableorchestrationContext context,ILogger log)
{
try
{
log.LogWarning("Main Start");
await context.CallSuborchestratorAsync("Sub",null);
log.LogWarning("Main End");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
[FunctionName("Sub")]
public static async Task RunSuborchestrator(
[orchestrationTrigger] IDurableorchestrationContext context,ILogger log)
{
log.LogWarning("Sub Start");
var data = await context.CallActivityAsync<string>("Activity",null);
log.LogWarning("Sub End");
}
[FunctionName("Activity")]
public static async Task<string> GetDataActivity([ActivityTrigger] string name,ILogger log)
{
log.LogWarning("Activity Start");
var data = await GetDataAsync("https://www.google.com");
log.LogWarning("Activity End");
return data;
}
[FunctionName("Start")]
public static async Task<IActionResult> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous,"get","post")]
HttpRequestMessage req,[DurableClient] IDurableorchestrationClient starter,ILogger log)
{
var instanceId = await starter.StartNewAsync("Main",null);
log.LogWarning($"Started orchestration with ID = '{instanceId}'.");
return new OkResult();
}
private static async Task<string> GetDataAsync(string url)
{
var httpClient = new HttpClient();
using var request = new HttpRequestMessage
{
RequestUri = new Uri(url),Method = HttpMethod.Get,};
var response = await httpClient.SendAsync(request);
response.EnsureSuccessstatusCode();
return await response.Content.ReadAsstringAsync();
}
}
}
解决方法
这是预料之中的。例如在 await context.CallActivityAsync("Activity",null);代码会自行暂停,甚至可能会被加载出内存(为了节省成本)。
然后协调器等待将事件放置在活动创建的另一个 Azure 存储表中,这可能会在许多天后发生。对于活动,它们通常是非常即时的,但它仍会等待此事件发生。
当发生这种情况时,代码需要从上次停止的地方开始,但没有办法做到这一点。因此,代码从头开始重新运行,但不是等待活动再次完成,而是首先查看表并看到我们已经完成了该活动并且可以继续运行。如果活动函数返回某个值,它将从 await 调用中返回。在协调器的两次运行期间,它都会记录,但因为我们只进入那些只会被记录的活动。
这就是编排器必须具有确定性的原因,例如第一次运行的随机值与第二次运行的随机值不同。相反,我们会将 random.Next() 放入活动函数中,以便将值保存到 Azure 表存储以供后续重新运行时使用。协调器也可能正在等待正常函数创建的一些外部事件。例如,有人必须验证他们的电子邮件帐户,这可能需要几天的时间,这就是为什么持久函数可以在事件触发时自行卸载并重新启动。
,@FilipB 所说的都是真的。它只是缺少解决它的实际代码;)
[FunctionName("Main")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,ILogger log)
{
log = context.CreateReplaySafeLogger(log); // this is what you should use at the start of every Orchestrator