问题描述
我有一个需要并行运行的循环,因为每次迭代都很慢且处理器密集型,但我还需要调用异步方法作为循环中每次迭代的一部分。
我已经看到有关如何在循环中处理异步方法的问题,但没有看到有关如何处理异步和同步的组合的问题,这就是我所知道的。
我的(简化)代码如下 - 我知道由于异步操作被传递给 foreach,这将无法正常工作。
protected IDictionary<int,ReportData> GetReportData()
{
var results = new ConcurrentDictionary<int,ReportData>();
Parallel.ForEach(requestData,async data =>
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await buildrequestAsync(processedData);
// synchronous building
var report = reportRequest.buildreport();
results.TryAdd(data.ReportId,report);
});
// This needs to be populated before returning
return results;
}
当操作必须是异步的以等待单个异步调用时,有什么方法可以并行执行操作。
我不想拆分动作并有一个 Parallel.ForEach 后跟一个带有 WhenAll 和另一个 Parallel.ForEach 的异步调用,因为每个阶段的速度在不同的迭代之间可能会有很大差异,因此拆分它会效率低下因为速度快的会在继续之前等待速度慢的。
我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach,但从未使用过 PLINQ 并且不确定它是否会在返回之前等待所有迭代完成,即任务是否仍在运行过程结束。
解决方法
当操作必须是异步的以等待单个异步调用时,有什么方法可以并行执行操作。
是的,但您需要了解 Parallel
给您带来的好处,而当您采取其他方法时,您会失去什么。具体来说,Parallel
会自动确定合适的线程数并根据使用情况进行调整。
将同步函数转换为异步函数不是一个实用的选择。
对于 CPU 密集型方法,您不应该转换它们。
我不想拆分动作并有一个 Parallel.ForEach 后跟一个带有 WhenAll 和另一个 Parallel.ForEach 的异步调用,因为每个阶段的速度在不同的迭代之间可能会有很大差异,因此拆分它会效率低下因为速度快的会在继续之前等待速度慢的。
我要提出的第一个建议是研究TPL Dataflow。它允许您定义各种“管道”,以保持数据流过,同时限制每个阶段的并发性。
我确实想知道是否可以使用 PLINQ ForAll 代替 Parallel.ForEach
没有。 PLINQ 的工作方式与 Parallel
非常相似。它们在 CPU 利用率方面的积极程度存在一些差异,以及一些 API 差异 - 例如,如果您有一个最终结果的集合,PLINQ 通常比 Parallel
更清晰 - 但在高级视图中他们非常相似。两者都只适用于同步代码。
但是,您可以将简单的 Task.Run
与 Task.WhenAll
一起使用:
protected async Task<IDictionary<int,ReportData>> GetReportDataAsync()
{
var tasks = requestData.Select(async data => Task.Run(() =>
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
return (Key: data.ReportId,Value: report);
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key,x => x.Value);
}
您可能需要应用并发限制(Parallel
会为您完成)。在异步世界中,这看起来像:
protected async Task<IDictionary<int,ReportData>> GetReportDataAsync()
{
var throttle = new SemaphoreSlim(10);
var tasks = requestData.Select(data => Task.Run(async () =>
{
await throttle.WaitAsync();
try
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
return (Key: data.ReportId,Value: report);
}
finally
{
throttle.Release();
}
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key,x => x.Value);
}