问题描述
范围:
- 我想通过将一个大文件 (1 GB+) 拆分成更小的(可管理的)块(分区)来处理它,将它们保存在一些存储基础设施(本地磁盘、blob、网络等)上并逐一处理它们,在内存中。
- 我想通过利用 TPL Dataflow 库来实现这一点,并且我在内存文件分区上创建了多个处理块,每个处理块都执行特定的操作。
- 进一步,我使用 SemaphoreSlim 对象来限制在给定时间处理的最大内存分区数,直到它被加载和完全处理。
- 我还在块级别使用 MaxDegreeOfParallelism 配置属性来限制每个块的并行度。
从技术角度来看,范围是通过使用信号量来限制多个分区的并行处理,跨几个连续的管道步骤,从而避免内存过载。
问题描述:当对于除第一个数据流块之外的所有数据流块的 MaxDegreeOfParallelism 设置为大于 1 的值时,进程挂起并似乎陷入死锁。当 MaxDegreeOfParallelism 设置为 1 时,一切都按预期进行。下面的代码示例...
您知道为什么会发生这种情况吗?
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DemoConsole
{
class Program
{
private static readonly SemaphoreSlim _localSemaphore = new(1);
static async Task Main(string[] args)
{
Console.WriteLine("Configuring pipeline...");
var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
var filter1 = new TransformManyBlock<string,PartitionInfo>(CreatePartitionsAsync,new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
// when MaxDegreeOfParallelism on the below line is set to 1,everything works as expected; any value greater than 1 causes issues
var blockOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 };
var filter2 = new TransformBlock<PartitionInfo,PartitionInfo>(ReadPartitionAsync,blockOptions);
var filter3 = new TransformBlock<PartitionInfo,PartitionInfo>(MapPartitionAsync,blockOptions);
var filter4 = new TransformBlock<PartitionInfo,PartitionInfo>(ValidatePartitionAsync,blockOptions);
var actionBlock = new ActionBlock<PartitionInfo>(async (x) => { await Task.CompletedTask; });
filter1.LinkTo(filter2,dataflowLinkOptions);
filter2.LinkTo(filter3,dataflowLinkOptions);
filter3.LinkTo(filter4,dataflowLinkOptions);
filter4.LinkTo(actionBlock,dataflowLinkOptions);
await filter1.SendAsync("my-file.csv");
filter1.Complete();
await actionBlock.Completion;
Console.WriteLine("Pipeline completed.");
Console.ReadKey();
Console.WriteLine("Done");
}
private static async Task<IEnumerable<PartitionInfo>> CreatePartitionsAsync(string input)
{
var partitions = new List<PartitionInfo>();
const int noOfPartitions = 10;
Log($"Creating {noOfPartitions} partitions from raw file on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
for (short i = 1; i <= noOfPartitions; i++)
{
partitions.Add(new PartitionInfo { FileName = $"{Path.GetFileNameWithoutExtension(input)}-p{i}-raw.json",Current = i });
}
await Task.CompletedTask;
Log($"Creating {noOfPartitions} partitions from raw file completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
return partitions;
}
private static async Task<PartitionInfo> ReadPartitionAsync(PartitionInfo input)
{
Log($"Sempahore - trying to enter for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
await _localSemaphore.WaitAsync();
Log($"Sempahore - entered for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
Log($"Reading partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
await Task.Delay(1000);
Log($"Reading partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
return input;
}
private static async Task<PartitionInfo> MapPartitionAsync(PartitionInfo input)
{
Log($"Mapping partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
await Task.Delay(1000);
Log($"Mapping partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
return input;
}
private static async Task<PartitionInfo> ValidatePartitionAsync(PartitionInfo input)
{
Log($"Validating partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
await Task.Delay(1000);
Log($"Validating partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
Log($"Sempahore - releasing - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
_localSemaphore.Release();
Log($"Sempahore - released - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
return input;
}
private static void Log(string message) => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} : {message}");
}
class PartitionInfo
{
public string FileName { get; set; }
public short Current { get; set; }
}
}
解决方法
在实施此解决方案之前,请查看注释,因为您的代码中存在一个基本的架构问题。
但是,您发布的问题是可重现的,可以通过以下 ExecutionDataflowBlockOption 更改来解决:
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5,EnsureOrdered = false });
EnsureOrdered 属性默认为 true。当并行度 > 1 时,无法保证首先处理哪个消息。如果首先处理的消息不是块接收的第一个消息,它将在重新排序缓冲区中等待,直到它接收到的第一条消息完成。因为 filter1 是一个 TransformManyBlock,我不确定是否有可能知道每条消息发送到 filter2 的顺序。
如果你运行你的代码足够多,你最终会很幸运,发送到 filter2 的第一条消息也会首先得到处理,在这种情况下,它会释放信号量和进度。但是在处理下一条消息时,您将遇到同样的问题;如果不是收到的第二条消息,它将在重新排序缓冲区中等待。