使用ConcurrentBag的并行ForEach无法正常工作

问题描述

我有这段代码可以处理列表中的项目:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One",ID = "123" },new Item { Name = "Two",ID = "234" },new Item { Name = "Three",ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items,(item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }

我基本上是根据几个条件使用ConcurrentBag来检查对象是否存在。
期望总是得到这样的输出(顺序可能有所不同):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]

但是我有时会得到一个输出,表明我的代码不是线程安全的:

Processing item [Three]
Processing item [One]
Processing item [Two]

所以我对itemsProcess.FirstOrDefault()会阻止的假设是错误的。
使用lock不会更改任何内容。显然,这里有问题,我真的不明白为什么吗?

我知道我可以通过其他方式“解决”此问题(一种方法是在输入Parallel.ForEach()之前准备列表),但是我真的很想知道为什么? >

解决方法

并行循环中有2个独立的操作:FirstOrDefaultAdd

ConcurrentBag无法确保这两个操作之间的线程安全。

另一种选择是ConcurrentDictionary,它具有GetOrAdd方法,该方法仅在不存在键时才添加项目:

var itemsProcess = new ConcurrentDictionary<string,Item>();
Parallel.ForEach(items,item =>
{
    // Returns existing item with same ID or adds this item
    var itemProcess = itemsProcess.GetOrAdd(item.Id,item);
    if (!object.ReferenceEquals(item,itemProcess))
    {
        Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
    }
    else
    {
        Console.WriteLine($"Processing item [{item.Name}]");
        // do some work...
    }
});

如果您随后需要将经过处理的项目作为ICollection,则可以通过itemsProcess.Values访问它们。

,

之所以这样,是因为仍然存在数据争用 ...... 2个线程仍可以在非线程安全的环境下读取并添加到ConcurrentBag > em>方式。使用任何并发集合仅表示您具有自洽的结构,但并不能保护您免于编写其他非线程安全代码

使用lock

具有正确的想法
var itemsProcess = new Dictionary<string,(item) =>
{

   lock (_Lock)
   {
      if (itemsProcess.TryGetValue(item.ID,out var val))
      {
         Console.WriteLine($"Item [{item.Name}] was already processed as [{val.Name}]");
         return;
      }

      itemsProcess.TryAdd(item.ID,item);
   }

   Console.WriteLine($"Processing item [{item.Name}]");
   Thread.Sleep(1000); // do some work...

});

注意:在并行处理重复项之前,您还可以过滤列表,以免根本不需要锁定或收集

,

如果不求助于锁,则可以“滥用” ConcurrentDictionary,并避免在此处进行所有锁定以确保唯一性。

通过ID将项目添加到字典中,数据结构将保持一致,一旦完成,您就可以使用dictionary.Values字段来获取唯一项。

P.S .:我觉得您的示例涉及更多,因为没有人使用Distinct()Parallel.ForEach(),这正是您的代码所要达到的目的。

最后,要解决发生这种情况的原因,就并发而言,这几乎总是一种反模式,并且不符合作者在此处的意思。

if(!collection.Contains(item))
      collection.Add(item);

Contains()执行并返回false之前,另一个线程可能已经执行了相同的程序,因此竞速并添加了相同的项目。

这种竞态条件是为什么几乎所有集合修改操作都具有两种形式的原因:您拥有collection.TryAdd(),它将尝试自动添加项目并返回true / false以告诉您结果,或者您遇到类似问题GetOrAdd()AddOrUpdate()再次自动插入项目并随后获取/更新。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...