包含异步和同步的并行循环

问题描述

我有一个需要并行运行的循环,因为每次迭代都很慢且处理器密集型,但我还需要调用异步方法作为循环中每次迭代的一部分。

我已经看到有关如何在循环中处理异步方法的问题,但没有看到有关如何处理异步和同步的组合的问题,这就是我所知道的。

我的(简化)代码如下 - 我知道由于异步操作被传递给 foreach,这将无法正常工作。

protected IDictionary<int,ReportData> GetReportData()
{
    var results = new ConcurrentDictionary<int,ReportData>();
      
    Parallel.ForEach(requestData,async data =>
    {
        // process data synchronously
        var processedData = ProcessData(data);

        // get some data async
        var reportRequest = await buildrequestAsync(processedData);

        // synchronous building
        var report = reportRequest.buildreport();

        results.TryAdd(data.ReportId,report);
     });

     // This needs to be populated before returning
     return results;
}

当操作必须是异步的以等待单个异步调用时,有什么方法可以并行执行操作。

将同步函数转换为异步函数不是一个实用的选择。

我不想拆分动作并有一个 Parallel.ForEach 后跟一个带有 WhenAll 和另一个 Parallel.ForEach 的异步调用,因为每个阶段的速度在不同的迭代之间可能会有很大差异,因此拆分它会效率低下因为速度快的会在继续之前等待速度慢的。

我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach,但从未使用过 PLINQ 并且不确定它是否会在返回之前等待所有迭代完成,即任务是否仍在运行过程结束。

解决方法

当操作必须是异步的以等待单个异步调用时,有什么方法可以并行执行操作。

是的,但您需要了解 Parallel 给您带来的好处,而当您采取其他方法时,您会失去什么。具体来说,Parallel 会自动确定合适的线程数并根据使用情况进行调整。

将同步函数转换为异步函数不是一个实用的选择。

对于 CPU 密集型方法,您不应该转换它们。

我不想拆分动作并有一个 Parallel.ForEach 后跟一个带有 WhenAll 和另一个 Parallel.ForEach 的异步调用,因为每个阶段的速度在不同的迭代之间可能会有很大差异,因此拆分它会效率低下因为速度快的会在继续之前等待速度慢的。

我要提出的第一个建议是研究TPL Dataflow。它允许您定义各种“管道”,以保持数据流过,同时限制每个阶段的并发性。

我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach

没有。 PLINQ 的工作方式与 Parallel 非常相似。它们在 CPU 利用率方面的积极程度存在一些差异,以及一些 API 差异 - 例如,如果您有一个最终结果的集合,PLINQ 通常比 Parallel 更清晰 - 但在高级视图中他们非常相似。两者都只适用于同步代码。

但是,您可以将简单的 Task.RunTask.WhenAll 一起使用:

protected async Task<IDictionary<int,ReportData>> GetReportDataAsync()
{
  var tasks = requestData.Select(async data => Task.Run(() =>
  {
    // process data synchronously
    var processedData = ProcessData(data);

    // get some data async
    var reportRequest = await BuildRequestAsync(processedData);

    // synchronous building
    var report = reportRequest.BuildReport();

    return (Key: data.ReportId,Value: report);
  })).ToList();
  var results = await Task.WhenAll(tasks);
  return results.ToDictionary(x => x.Key,x => x.Value);
}

您可能需要应用并发限制(Parallel 会为您完成)。在异步世界中,这看起来像:

protected async Task<IDictionary<int,ReportData>> GetReportDataAsync()
{
  var throttle = new SemaphoreSlim(10);
  var tasks = requestData.Select(data => Task.Run(async () =>
  {
    await throttle.WaitAsync();
    try
    {
      // process data synchronously
      var processedData = ProcessData(data);

      // get some data async
      var reportRequest = await BuildRequestAsync(processedData);

      // synchronous building
      var report = reportRequest.BuildReport();

      return (Key: data.ReportId,Value: report);
    }
    finally
    {
      throttle.Release();
    }
  })).ToList();
  var results = await Task.WhenAll(tasks);
  return results.ToDictionary(x => x.Key,x => x.Value);
}