使用 SemaphoreSlim 限制 TPL 数据流的问题

问题描述

范围:

  • 我想通过将一个文件 (1 GB+) 拆分成更小的(可管理的)块(分区)来处理它,将它们保存在一些存储基础设施(本地磁盘、blob、网络等)上并逐一处理它们,在内存中。
  • 我想通过利用 TPL Dataflow 库来实现这一点,并且我在内存文件分区上创建了多个处理块,每个处理块都执行特定的操作。
  • 进一步,我使用 SemaphoreSlim 对象来限制在给定时间处理的最大内存分区数,直到它被加载和完全处理。
  • 我还在块级别使用 MaxDegreeOfParallelism 配置属性来限制每个块的并行度。

从技术角度来看,范围是通过使用信号量来限制多个分区的并行处理,跨几个连续的管道步骤,从而避免内存过载。

问题描述:当对于除第一个数据流块之外的所有数据流块的 MaxDegreeOfParallelism 设置为大于 1 的值时,进程挂起并似乎陷入死锁。当 MaxDegreeOfParallelism 设置为 1 时,一切都按预期进行。下面的代码示例...

您知道为什么会发生这种情况吗?

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DemoConsole
{
    class Program
    {
        private static readonly SemaphoreSlim _localSemaphore = new(1);

        static async Task Main(string[] args)
        {
            Console.WriteLine("Configuring pipeline...");

            var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

            var filter1 = new TransformManyBlock<string,PartitionInfo>(CreatePartitionsAsync,new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

            // when MaxDegreeOfParallelism on the below line is set to 1,everything works as expected; any value greater than 1 causes issues              
            var blockOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 };

            var filter2 = new TransformBlock<PartitionInfo,PartitionInfo>(ReadPartitionAsync,blockOptions);
            var filter3 = new TransformBlock<PartitionInfo,PartitionInfo>(MapPartitionAsync,blockOptions);
            var filter4 = new TransformBlock<PartitionInfo,PartitionInfo>(ValidatePartitionAsync,blockOptions);

            var actionBlock = new ActionBlock<PartitionInfo>(async (x) => { await Task.CompletedTask; });

            filter1.LinkTo(filter2,dataflowLinkOptions);
            filter2.LinkTo(filter3,dataflowLinkOptions);
            filter3.LinkTo(filter4,dataflowLinkOptions);
            filter4.LinkTo(actionBlock,dataflowLinkOptions);

            await filter1.SendAsync("my-file.csv");

            filter1.Complete();

            await actionBlock.Completion;

            Console.WriteLine("Pipeline completed.");
            Console.ReadKey();
            Console.WriteLine("Done");
        }

        private static async Task<IEnumerable<PartitionInfo>> CreatePartitionsAsync(string input)
        {
            var partitions = new List<PartitionInfo>();
            const int noOfPartitions = 10;

            Log($"Creating {noOfPartitions} partitions from raw file on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");

            for (short i = 1; i <= noOfPartitions; i++)
            {
                partitions.Add(new PartitionInfo { FileName = $"{Path.GetFileNameWithoutExtension(input)}-p{i}-raw.json",Current = i });
            }

            await Task.CompletedTask;

            Log($"Creating {noOfPartitions} partitions from raw file completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            return partitions;
        }

        private static async Task<PartitionInfo> ReadPartitionAsync(PartitionInfo input)
        {
            Log($"Sempahore - trying to enter for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
            await _localSemaphore.WaitAsync();
            Log($"Sempahore - entered for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");

            Log($"Reading partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
            await Task.Delay(1000);
            Log($"Reading partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            return input;
        }

        private static async Task<PartitionInfo> MapPartitionAsync(PartitionInfo input)
        {
            Log($"Mapping partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
            await Task.Delay(1000);
            Log($"Mapping partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            return input;
        }

        private static async Task<PartitionInfo> ValidatePartitionAsync(PartitionInfo input)
        {
            Log($"Validating partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
            await Task.Delay(1000);
            Log($"Validating partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            Log($"Sempahore - releasing - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
            _localSemaphore.Release();
            Log($"Sempahore - released - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");

            return input;
        }

        private static void Log(string message) => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} : {message}");
    }

    class PartitionInfo
    {
        public string FileName { get; set; }
        public short Current { get; set; }
    }
}

解决方法

在实施此解决方案之前,请查看注释,因为您的代码中存在一个基本的架构问题。

但是,您发布的问题是可重现的,可以通过以下 ExecutionDataflowBlockOption 更改来解决:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5,EnsureOrdered = false });

EnsureOrdered 属性默认为 true。当并行度 > 1 时,无法保证首先处理哪个消息。如果首先处理的消息不是块接收的第一个消息,它将在重新排序缓冲区中等待,直到它接收到的第一条消息完成。因为 filter1 是一个 TransformManyBlock,我不确定是否有可能知道每条消息发送到 filter2 的顺序。

如果你运行你的代码足够多,你最终会很幸运,发送到 filter2 的第一条消息也会首先得到处理,在这种情况下,它会释放信号量和进度。但是在处理下一条消息时,您将遇到同样的问题;如果不是收到的第二条消息,它将在重新排序缓冲区中等待。