并行AddOrUpdate的ConcurrentDictionary和ConcurrentBag 更新:假设地讲...

问题描述

使用ConcurrentDictionary和ConcurrentBag来添加或更新值是否正确?

基本上尝试执行以下操作,

  1. 具有包含数百万条记录的文件,并试图处理并提取到对象。

  2. 条目类似于键值对, Key = WBAN和值作为对象

     var cd = new ConcurrentDictionary<String,ConcurrentBag<Data>>();
     int count = 0;
    
     foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
     {
         var sInfo = line.Split(new char[] { ',' });
         cd.AddOrUpdate(sInfo[0],new ConcurrentBag<Data>(){ new Data()
         {
             WBAN =  sInfo[0],Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
     }
         },(oldKey,oldValue) =>
         {
             oldValue.Add(new Data()
             {
                 WBAN = sInfo[0],time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
             });
    
             return oldValue;
         }
         );
     }
    

解决方法

  • 您的程序是IO绑定的,而不是CPU绑定的,因此并行处理没有任何优势。
    • 它受IO限制,因为您的程序必须先从文件中读取该行,然后才能处理一行数据,并且通常说来,计算机从存储中读取数据的速度总是比其慢得多。处理它。
    • 由于您的程序在每次读取的行上仅执行琐碎的字符串操作,因此我可以放心地以99.9%的把握说将Data元素添加到Dictionary<String,List<Data>>所花费的时间很小您的计算机从文本文件中读取一行的时间只占一小部分。
  • 此外,避免对此类程序使用File.ReadLines,因为这将首先将整个文件读入内存。
    • 如果查看我的解决方案,您会看到它使用StreamReader来逐行读取每一行,这意味着它不需要等到它首先将所有内容读取到内存中即可。

因此,要以最佳性能解析该文件,就不需要任何并发集合。

就这样:

private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.

public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
    const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.

    Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );


    using( FileStream fs = new FileStream( path,FileAccess.Read,FileMode.Open,FileShare.Read,ONE_MEGABYTE,FileOptions.Asynchronous | FileOptions.SequentialScan ) )
    using( StreamReader rdr = new StreamReader( fs ) )
    {
        String line;
        while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
        {
            String[] values = line.Split( sep );
            if( values.Length < 3 ) continue;

            Data d = new Data()
            {
                WBAN = values[0],Date = values[1],time = values[2]
            };

            if( !dict.TryGetValue( d.WBAN,out List<Data> list ) )
            {
                dict[ d.WBAN ] = list = new List<Data>();
            }

            list.Add( d );
        }
    }
}

更新:假设地讲...

从理论上讲,由于文件IO(尤其是异步FileStream IO)使用大缓冲区(在这种情况下为ONE_MEGABYTE大小的缓冲区),因此程序可以传递每个缓冲区(按顺序读取)进入并行处理器。

但是问题在于,该缓冲区内的数据无法被微不足道地分配给各个线程:在这种情况下,因为行的长度不是固定的,所以单个线程仍然需要读取整个缓冲区以找出换行符的位置(技术上 可以进行一些并行化,这会增加大量的复杂性(因为您还需要处理跨越缓冲区边界的行) ,或仅包含一行的缓冲区等)。

在这种小规模的情况下,由于程序仍很大程度上受IO限制,因此使用线程池和并发收集类型的开销将消除并行处理的速度。

现在,如果您有一个大小为GB的文件,并且Data条记录的大小约为1KB,那么我将详细说明如何做到这一点,因为在这种规模下,您可能会看到适度的性能增强。

,

您的想法基本上是正确的,但是在实现上存在缺陷。用ParallelQuery语句枚举foreach不会导致循环内的代码并行运行。在这一阶段,并行化阶段已经完成。在您的代码中,实际上没有并行的工作,因为.AsParallel().WithDegreeOfParallelism(5)之后没有附加运算符。要并行执行循环,必须将foreach替换为ForAll运算符,如下所示:

File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .ForAll(line => { /* Process each line in parallel */ });

了解此处并行化的内容很重要。每行的处理是并行的,而文件系统中的每行的加载则不是。加载已序列化。并行LINQ引擎使用的工作线程(其中一个是当前线程)在访问源IEnumerable(在这种情况下为File.ReadLines(path))时是同步的。

使用嵌套的ConcurrentDictionary<String,ConcurrentBag<Data>>结构来存储处理过的行不是很有效。您可以相信PLINQ在对数据进行分组方面比在并发集合等中手动完成的工作更好。 通过使用ToLookup运算符,您可以获得ILookup<string,Data>,它实际上是一个只读字典,每个键具有多个值。

var separators = new char[] { ',' };

var lookup = File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .Select(line => line.Split(separators))
    .ToLookup(sInfo => sInfo[0],sInfo => new Data()
    {
        WBAN =  sInfo[0],Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
    });

关于性能和内存效率,这应该是一个更好的选择,除非出于某种原因您特别希望所得结构可变且线程安全。

另外两个注释:

  1. 对并行度进行硬编码(在这种情况下为5)是可以的,只要您知道程序将在其上运行的硬件即可。否则,可能会因超额订购(具有比机器的实际核心更多的线程数)而引起摩擦。提示:虚拟机通常配置为单线程。

  2. ConcurrentBag非常的专门收藏。在大多数情况下,使用ConcurrentQueue可以获得更好的性能。这两个类都提供类似的API。人们可能更喜欢ConcurrentBag,因为它的Add方法比Enqueue更熟悉。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...