问题描述
||
大家。
我在传统的生产者-消费者方案中使用BlockingCollection。要一个接一个地处理集合中的项目,我必须编写以下代码:
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
ProcessItem(item);
}
但是如何处理一批N项(等到收集少于N项)?
我的解决方案是使用一些临时缓冲区:
var buffer = new List<MyType>(N);
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
buffer.Add(item);
if (buffer.Count == N)
{
foreach (var item in items)
{
ProcessItem(item);
}
buffer.Clear();
}
}
但是在我看来,这非常丑陋……有更好的方法吗?
[更新]:
这是扩展方法的原型,它使解决方案更具可读性。也许有人会发现它有用:
public static class BlockingCollectionExtensions
{
public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection,CancellationToken cancellationToken,Int32 bufferSize)
{
var buffer = new List<T>(bufferSize);
while (buffer.Count < bufferSize)
{
try
{
buffer.Add(collection.Take(cancellationToken));
}
catch (OperationCanceledException)
{
// we need to handle the rest of buffer,// even if the task has been cancelled.
break;
}
}
return buffer;
}
}
和用法:
foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token,5))
{
// Todo: process items here...
}
当然,这不是一个完整的解决方案:例如,我将添加任何超时支持-如果没有足够的项目,但是经过了一段时间,我们需要停止等待并处理已经添加到缓冲区的项目。
解决方法
我找不到那种丑陋的解决方案。批处理是对阻塞收集的工作的正交要求,应将其视为此类。我将批处理处理行为封装在一个带有干净接口的
BatchProcessor
类中,但除此之外,我并没有真正看到这种方法的问题。
,您可能会发现队列的无锁实现以及阻塞集合是过早的优化。如果您退后一步,将Queue与基于Monitor的锁一起使用,则可以编写更清晰的代码。
,首先,我不确定您的逻辑是否正确。您说要等到集合少于N个项目时,不是吗?您希望集合包含N个或更多项,以便处理N个项目。也许我误会了。
然后,我还建议您如果少于N个项目,则逐个处理项目,否则您可能会发现您的应用程序似乎挂在N-1个项目上。当然,如果这是稳定的数据流,则仅在buffer.Count> = N时才可以处理。
我建议去排队,然后像GregC所说的Monitor。
像这样:
public object Dequeue() {
while (_queue.Count < N) {
Monitor.Wait(_queue);
}
return _queue.Dequeue();
}
public void Enqueue( object q )
{
lock (_queue)
{
_queue.Enqueue(q);
if (_queue.Count == N)
{
// wake up any blocked dequeue call(s)
Monitor.PulseAll(_queue);
}
}
}