c# – 与BlockingCollection集成时,Parallel.ForEach停滞不前

我根据 this question中的代码采用了并行/消费者的实现
class ParallelConsumer<T> : Idisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel,Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = _maxParallel,CancellationToken = _tokenSource.Token },(item,loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },_tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.dispose();
        _tokenSource.dispose();
        _task = null;
    }
}

这是一个测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100,1);
    }

    private static void TestRepeatedEnqueue(int itemCount,int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,(i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试总是失败 – 它总是停留在测试的100个项目的第93项.知道我的代码的哪一部分引起了这个问题,以及如何解决它?

解决方法

正如您所发现的,您不能将Parallel.Foreach()与BlockingCollection.GetConsumingEnumerable()一起使用.

有关解释,请参阅此博客文章

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用它来解决问题.

摘自博客

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently,but ForEach doesn’t kNow that,and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

As such,there’s more synchronization here than is actually necessary,resulting in a potentially non-negligable performance hit.

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element,it’ll take the lock,grab a group of elements (a chunk),and then release the lock.

While this design can help with overall throughput,for scenarios that are focused more on low latency,that chunking can be prohibitive.

相关文章

原文地址:http://msdn.microsoft.com/en-us/magazine/cc163...
前言 随着近些年微服务的流行,有越来越多的开发者和团队所采...
最近因为比较忙,好久没有写博客了,这篇主要给大家分享一下...
在多核CPU在今天和不久的将来,计算机将拥有更多的内核,Mic...
c语言输入成绩怎么判断等级
字符型数据在内存中的存储形式是什么